This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b06ac  [FLINK-14401][table][hive] create 
DefaultFunctionDefinitionFactory to instantiate regular java class-based udf
d1b06ac is described below

commit d1b06ac4e8ab033c699b656cb0f1d924b6bafc56
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Tue Oct 15 16:21:29 2019 -0700

    [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to 
instantiate regular java class-based udf
    
    create FunctionDefinitionUtil to instantiate regular java class-based udf 
and add HiveFunctionDefinitionFactory to instantiate both flink and hive udf
    
    This closes #9908.
---
 .../flink/connectors/hive/HiveTableFactory.java    | 103 +---------------
 .../flink/table/catalog/hive/HiveCatalog.java      |   7 ++
 .../factories/HiveFunctionDefinitionFactory.java}  | 100 ++-------------
 .../table/tests/test_catalog_completeness.py       |   2 +-
 .../flink/table/catalog/FunctionCatalog.java       |  24 ++--
 .../table/functions/FunctionDefinitionUtil.java    |  77 ++++++++++++
 .../functions/FunctionDefinitionUtilTest.java      | 135 +++++++++++++++++++++
 .../org/apache/flink/table/catalog/Catalog.java    |  12 +-
 .../table/factories/FunctionDefinitionFactory.java |   3 +-
 9 files changed, 253 insertions(+), 210 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 235919c..89f76da 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -18,29 +18,13 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.functions.AggregateFunctionDefinition;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
-import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
-import org.apache.flink.table.functions.hive.HiveGenericUDAF;
-import org.apache.flink.table.functions.hive.HiveGenericUDF;
-import org.apache.flink.table.functions.hive.HiveGenericUDTF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDF;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
@@ -49,12 +33,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.UDAF;
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,20 +46,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A table factory implementation for Hive catalog.
  */
 public class HiveTableFactory
-               implements TableSourceFactory<Row>, TableSinkFactory<Row>, 
FunctionDefinitionFactory {
+               implements TableSourceFactory<Row>, TableSinkFactory<Row> {
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableFactory.class);
 
        private final HiveConf hiveConf;
-       private final String hiveVersion;
-       private final HiveShim hiveShim;
 
        public HiveTableFactory(HiveConf hiveConf) {
                this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
-
-               // this has to come from hiveConf, otherwise we may lose what 
user specifies in the yaml file
-               this.hiveVersion = 
checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
-                               "Hive version is not defined");
-               this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        }
 
        @Override
@@ -145,76 +116,4 @@ public class HiveTableFactory
        private OutputFormatTableSink<Row> 
createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
                return new HiveTableSink(new JobConf(hiveConf), tablePath, 
table);
        }
-
-       @Override
-       public FunctionDefinition createFunctionDefinition(String name, 
CatalogFunction catalogFunction) {
-               String functionClassName = catalogFunction.getClassName();
-
-               if 
(Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC)))
 {
-                       throw new TableException(
-                               String.format("HiveFunctionDefinitionFactory 
does not support generic functions %s yet", name));
-               }
-
-               Class clazz;
-               try {
-                       clazz = 
Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
-
-                       LOG.info("Successfully loaded Hive udf '{}' with class 
'{}'", name, functionClassName);
-               } catch (ClassNotFoundException e) {
-                       throw new TableException(
-                               String.format("Failed to initiate an instance 
of class %s.", functionClassName), e);
-               }
-
-               if (UDF.class.isAssignableFrom(clazz)) {
-                       LOG.info("Transforming Hive function '{}' into a 
HiveSimpleUDF", name);
-
-                       return new ScalarFunctionDefinition(
-                               name,
-                               new HiveSimpleUDF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim)
-                       );
-               } else if (GenericUDF.class.isAssignableFrom(clazz)) {
-                       LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDF", name);
-
-                       return new ScalarFunctionDefinition(
-                               name,
-                               new HiveGenericUDF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim)
-                       );
-               } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
-                       LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDTF", name);
-
-                       HiveGenericUDTF udtf = new HiveGenericUDTF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim);
-
-                       return new TableFunctionDefinition(
-                               name,
-                               udtf,
-                               GenericTypeInfo.of(Row.class)
-                       );
-               } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) 
|| UDAF.class.isAssignableFrom(clazz)) {
-                       HiveGenericUDAF udaf;
-
-                       if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) 
{
-                               LOG.info(
-                                       "Transforming Hive function '{}' into a 
HiveGenericUDAF with no UDAF bridging and Hive version %s",
-                                       name, hiveVersion);
-
-                               udaf = new HiveGenericUDAF(new 
HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
-                       } else {
-                               LOG.info(
-                                       "Transforming Hive function '{}' into a 
HiveGenericUDAF with UDAF bridging and Hive version %s",
-                                       name, hiveVersion);
-
-                               udaf = new HiveGenericUDAF(new 
HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
-                       }
-
-                       return new AggregateFunctionDefinition(
-                               name,
-                               udaf,
-                               GenericTypeInfo.of(Object.class),
-                               
GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
-                       );
-               } else {
-                       throw new IllegalArgumentException(
-                               String.format("HiveFunctionDefinitionFactory 
cannot initiate FunctionDefinition for class %s", functionClassName));
-               }
-       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7914c89..1e66551 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -56,11 +56,13 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import 
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 
@@ -202,6 +204,11 @@ public class HiveCatalog extends AbstractCatalog {
                return Optional.of(new HiveTableFactory(hiveConf));
        }
 
+       @Override
+       public Optional<FunctionDefinitionFactory> 
getFunctionDefinitionFactory() {
+               return Optional.of(new HiveFunctionDefinitionFactory(hiveConf));
+       }
+
        // ------ databases ------
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
similarity index 62%
copy from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 235919c..3e32c7f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -16,24 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connectors.hive;
+package org.apache.flink.table.catalog.hive.factories;
 
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.factories.TableFactoryUtil;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
@@ -41,12 +37,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDAF;
 import org.apache.flink.table.functions.hive.HiveGenericUDF;
 import org.apache.flink.table.functions.hive.HiveGenericUDTF;
 import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.table.sinks.OutputFormatTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.UDAF;
@@ -55,106 +46,35 @@ import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A table factory implementation for Hive catalog.
+ * A factory to instantiate Hive UDFs as Flink UDFs.
  */
-public class HiveTableFactory
-               implements TableSourceFactory<Row>, TableSinkFactory<Row>, 
FunctionDefinitionFactory {
+public class HiveFunctionDefinitionFactory implements 
FunctionDefinitionFactory {
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableFactory.class);
 
-       private final HiveConf hiveConf;
        private final String hiveVersion;
        private final HiveShim hiveShim;
 
-       public HiveTableFactory(HiveConf hiveConf) {
-               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
-
+       public HiveFunctionDefinitionFactory(HiveConf hiveConf) {
                // this has to come from hiveConf, otherwise we may lose what 
user specifies in the yaml file
                this.hiveVersion = 
checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
-                               "Hive version is not defined");
+                       "Hive version is not defined");
                this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        }
 
        @Override
-       public Map<String, String> requiredContext() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public List<String> supportedProperties() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public TableSink<Row> createTableSink(Map<String, String> properties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public TableSource<Row> createTableSource(Map<String, String> 
properties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public TableSource<Row> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
-               Preconditions.checkNotNull(table);
-               Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
-               boolean isGeneric = 
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
-               if (!isGeneric) {
-                       return createInputFormatTableSource(tablePath, table);
-               } else {
-                       return TableFactoryUtil.findAndCreateTableSource(table);
-               }
-       }
-
-       /**
-        * Creates and configures a {@link 
org.apache.flink.table.sources.InputFormatTableSource} using the given {@link 
CatalogTable}.
-        */
-       private InputFormatTableSource<Row> 
createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
-               return new HiveTableSource(new JobConf(hiveConf), tablePath, 
table);
-       }
-
-       @Override
-       public TableSink<Row> createTableSink(ObjectPath tablePath, 
CatalogTable table) {
-               Preconditions.checkNotNull(table);
-               Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
-               boolean isGeneric = 
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
-               if (!isGeneric) {
-                       return createOutputFormatTableSink(tablePath, table);
-               } else {
-                       return TableFactoryUtil.findAndCreateTableSink(table);
-               }
-       }
-
-       /**
-        * Creates and configures a {@link 
org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link 
CatalogTable}.
-        */
-       private OutputFormatTableSink<Row> 
createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
-               return new HiveTableSink(new JobConf(hiveConf), tablePath, 
table);
-       }
-
-       @Override
        public FunctionDefinition createFunctionDefinition(String name, 
CatalogFunction catalogFunction) {
-               String functionClassName = catalogFunction.getClassName();
-
                if 
(Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC)))
 {
-                       throw new TableException(
-                               String.format("HiveFunctionDefinitionFactory 
does not support generic functions %s yet", name));
+                       FunctionDefinitionUtil.createFunctionDefinition(name, 
catalogFunction);
                }
 
+               String functionClassName = catalogFunction.getClassName();
+
                Class clazz;
                try {
                        clazz = 
Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py 
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 9474c30..003c144 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -40,7 +40,7 @@ class 
CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
     @classmethod
     def excluded_methods(cls):
         # open/close are not needed in Python API as they are used internally
-        return {'open', 'close', 'getTableFactory'}
+        return {'open', 'close', 'getTableFactory', 
'getFunctionDefinitionFactory'}
 
 
 class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, 
unittest.TestCase):
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 43280b4..6808216 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -242,21 +242,17 @@ public class FunctionCatalog implements FunctionLookup {
                        CatalogFunction catalogFunction = catalog.getFunction(
                                new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
 
-                       if (catalog.getTableFactory().isPresent() &&
-                               catalog.getTableFactory().get() instanceof 
FunctionDefinitionFactory) {
-
-                               FunctionDefinitionFactory factory = 
(FunctionDefinitionFactory) catalog.getTableFactory().get();
-
-                               userCandidate = 
factory.createFunctionDefinition(functionName, catalogFunction);
-
-                               return Optional.of(
-                                       new FunctionLookup.Result(
-                                               
ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), name),
-                                               userCandidate)
-                               );
+                       if (catalog.getFunctionDefinitionFactory().isPresent()) 
{
+                               userCandidate = 
catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName,
 catalogFunction);
                        } else {
-                               // TODO: should go through function definition 
discover service
+                               userCandidate = 
FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction);
                        }
+
+                       return Optional.of(
+                               new FunctionLookup.Result(
+                                       
ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), name),
+                                       userCandidate)
+                       );
                } catch (FunctionNotExistException e) {
                        // Ignore
                }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
new file mode 100644
index 0000000..98cedaf
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+
+/**
+ * A util to instantiate {@link FunctionDefinition} in the default way.
+ */
+public class FunctionDefinitionUtil {
+
+       public static FunctionDefinition createFunctionDefinition(String name, 
CatalogFunction catalogFunction) {
+               // Currently only handles Java class-based functions
+               Object func;
+               try {
+                       func = 
Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
+               } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+                       throw new IllegalStateException(
+                               String.format("Failed instantiating '%s'", 
catalogFunction.getClassName())
+                       );
+               }
+
+               UserDefinedFunction udf = (UserDefinedFunction) func;
+
+               if (udf instanceof ScalarFunction) {
+                       return new ScalarFunctionDefinition(
+                               name,
+                               (ScalarFunction) udf
+                       );
+               } else if (udf instanceof TableFunction) {
+                       TableFunction t = (TableFunction) udf;
+                       return new TableFunctionDefinition(
+                               name,
+                               t,
+                               t.getResultType()
+                       );
+               } else if (udf instanceof AggregateFunction) {
+                       AggregateFunction a = (AggregateFunction) udf;
+
+                       return new AggregateFunctionDefinition(
+                               name,
+                               a,
+                               a.getAccumulatorType(),
+                               a.getResultType()
+                       );
+               } else if (udf instanceof TableAggregateFunction) {
+                       TableAggregateFunction a = (TableAggregateFunction) udf;
+
+                       return new TableAggregateFunctionDefinition(
+                               name,
+                               a,
+                               a.getAccumulatorType(),
+                               a.getResultType()
+                       );
+               } else {
+                       throw new UnsupportedOperationException(
+                               String.format("Function %s should be of 
ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", 
catalogFunction.getClassName())
+                       );
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
new file mode 100644
index 0000000..84cb81c
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link FunctionDefinitionUtil}.
+ */
+public class FunctionDefinitionUtilTest {
+       @Test
+       public void testScalarFunction() {
+               FunctionDefinition fd = 
FunctionDefinitionUtil.createFunctionDefinition(
+                       "test",
+                               new 
CatalogFunctionImpl(TestScalarFunction.class.getName(), Collections.emptyMap())
+               );
+
+               assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() 
instanceof TestScalarFunction);
+       }
+
+       @Test
+       public void testTableFunction() {
+               FunctionDefinition fd = 
FunctionDefinitionUtil.createFunctionDefinition(
+                       "test",
+                       new 
CatalogFunctionImpl(TestTableFunction.class.getName(), Collections.emptyMap())
+               );
+
+               assertTrue(((TableFunctionDefinition) fd).getTableFunction() 
instanceof TestTableFunction);
+       }
+
+       @Test
+       public void testAggregateFunction() {
+               FunctionDefinition fd = 
FunctionDefinitionUtil.createFunctionDefinition(
+                       "test",
+                       new 
CatalogFunctionImpl(TestAggFunction.class.getName(), Collections.emptyMap())
+               );
+
+               assertTrue(((AggregateFunctionDefinition) 
fd).getAggregateFunction() instanceof TestAggFunction);
+       }
+
+       @Test
+       public void testTableAggregateFunction() {
+               FunctionDefinition fd = 
FunctionDefinitionUtil.createFunctionDefinition(
+                       "test",
+                       new 
CatalogFunctionImpl(TestTableAggFunction.class.getName(), 
Collections.emptyMap())
+               );
+
+               assertTrue(((TableAggregateFunctionDefinition) 
fd).getTableAggregateFunction() instanceof TestTableAggFunction);
+       }
+
+       /**
+        * Test function.
+        */
+       public static class TestScalarFunction extends ScalarFunction {
+
+       }
+
+       /**
+        * Test function.
+        */
+       public static class TestTableFunction extends TableFunction {
+               @Override
+               public TypeInformation getResultType() {
+                       return TypeInformation.of(Object.class);
+               }
+       }
+
+       /**
+        * Test function.
+        */
+       public static class TestAggFunction extends AggregateFunction {
+               @Override
+               public Object createAccumulator() {
+                       return null;
+               }
+
+               @Override
+               public TypeInformation getResultType() {
+                       return TypeInformation.of(Object.class);
+               }
+
+               @Override
+               public TypeInformation getAccumulatorType() {
+                       return TypeInformation.of(Object.class);
+               }
+
+               @Override
+               public Object getValue(Object accumulator) {
+                       return null;
+               }
+       }
+
+       /**
+        * Test function.
+        */
+       public static class TestTableAggFunction extends TableAggregateFunction 
{
+               @Override
+               public Object createAccumulator() {
+                       return null;
+               }
+
+               @Override
+               public TypeInformation getResultType() {
+                       return TypeInformation.of(Object.class);
+               }
+
+               @Override
+               public TypeInformation getAccumulatorType() {
+                       return TypeInformation.of(Object.class);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 8019ab0..8f210b9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 
 import java.util.List;
@@ -48,7 +49,7 @@ public interface Catalog {
 
        /**
         * Get an optional {@link TableFactory} instance that's responsible for 
generating table-related
-        * instances stored in this catalog, instances such as source/sink and 
function definitions.
+        * instances stored in this catalog, instances such as source/sink.
         *
         * @return an optional TableFactory instance
         */
@@ -57,6 +58,15 @@ public interface Catalog {
        }
 
        /**
+        * Get an optional {@link FunctionDefinitionFactory} instance that's 
responsible for instantiating function definitions.
+        *
+        * @return an optional FunctionDefinitionFactory instance
+        */
+       default Optional<FunctionDefinitionFactory> 
getFunctionDefinitionFactory() {
+               return Optional.empty();
+       }
+
+       /**
         * Open the catalog. Used for any required preparation in 
initialization phase.
         *
         * @throws CatalogException in case of any runtime exception
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
index 2e8c538..1e4d76b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
 
 /**
  * A factory to create {@link FunctionDefinition}.
- * See also {@link TableFactory} for more information.
  */
-public interface FunctionDefinitionFactory extends TableFactory {
+public interface FunctionDefinitionFactory {
 
        /**
         * Creates a {@link FunctionDefinition} from given {@link 
CatalogFunction}.

Reply via email to