This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09bcde276f25da11d5458b3f5ae5580143f2ead9
Author: Timo Walther <[email protected]>
AuthorDate: Mon Dec 9 14:38:34 2019 +0100

    [hotfix][table] Rename UserFunctionsTypeHelper
---
 .../java/internal/StreamTableEnvironmentImpl.java  | 12 ++++++------
 .../table/api/internal/TableEnvironmentImpl.java   | 14 +++++++-------
 .../flink/table/catalog/FunctionCatalog.java       | 22 +++++++++++-----------
 ...eHelper.java => UserDefinedFunctionHelper.java} | 10 +++++-----
 .../internal/StreamTableEnvironmentImpl.scala      | 12 ++++++------
 .../org/apache/flink/table/api/expressionDsl.scala |  8 ++++----
 .../flink/table/planner/expressions/call.scala     |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  6 +++---
 .../flink/table/planner/utils/TableTestBase.scala  |  6 +++---
 .../flink/table/api/internal/TableEnvImpl.scala    | 18 +++++++++---------
 .../org/apache/flink/table/expressions/call.scala  |  4 ++--
 .../table/runtime/aggregate/AggregateUtil.scala    |  4 ++--
 .../table/runtime/harness/HarnessTestBase.scala    |  6 +++---
 13 files changed, 63 insertions(+), 63 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 1e42070..760a934 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -53,7 +53,7 @@ import 
org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
@@ -152,7 +152,7 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
 
        @Override
        public <T> void registerFunction(String name, TableFunction<T> 
tableFunction) {
-               TypeInformation<T> typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction);
+               TypeInformation<T> typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
 
                functionCatalog.registerTempSystemTableFunction(
                        name,
@@ -163,8 +163,8 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
 
        @Override
        public <T, ACC> void registerFunction(String name, AggregateFunction<T, 
ACC> aggregateFunction) {
-               TypeInformation<T> typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
-               TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
+               TypeInformation<T> typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
+               TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper
                        
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
 
                functionCatalog.registerTempSystemAggregateFunction(
@@ -177,9 +177,9 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
 
        @Override
        public <T, ACC> void registerFunction(String name, 
TableAggregateFunction<T, ACC> tableAggregateFunction) {
-               TypeInformation<T> typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(
+               TypeInformation<T> typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(
                        tableAggregateFunction);
-               TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
+               TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper
                        
.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
 
                functionCatalog.registerTempSystemAggregateFunction(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 838edbb..920124e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -65,7 +65,7 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
-import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
@@ -875,9 +875,9 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                        AggregateFunctionDefinition aggregateFunctionDefinition 
= (AggregateFunctionDefinition) functionDefinition;
                        AggregateFunction<T, ACC > aggregateFunction =
                                (AggregateFunction<T, ACC >) 
aggregateFunctionDefinition.getAggregateFunction();
-                       TypeInformation<T> typeInfo = UserFunctionsTypeHelper
+                       TypeInformation<T> typeInfo = UserDefinedFunctionHelper
                                
.getReturnTypeOfAggregateFunction(aggregateFunction);
-                       TypeInformation<ACC> accTypeInfo = 
UserFunctionsTypeHelper
+                       TypeInformation<ACC> accTypeInfo = 
UserDefinedFunctionHelper
                                
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
 
                        functionCatalog.registerTempCatalogAggregateFunction(
@@ -888,7 +888,7 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                } else if (functionDefinition instanceof 
TableFunctionDefinition) {
                        TableFunctionDefinition tableFunctionDefinition = 
(TableFunctionDefinition) functionDefinition;
                        TableFunction<T> tableFunction = (TableFunction<T>) 
tableFunctionDefinition.getTableFunction();
-                       TypeInformation<T> typeInfo = UserFunctionsTypeHelper
+                       TypeInformation<T> typeInfo = UserDefinedFunctionHelper
                                .getReturnTypeOfTableFunction(tableFunction);
                        functionCatalog.registerTempCatalogTableFunction(
                                functionIdentifier,
@@ -908,9 +908,9 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                        AggregateFunctionDefinition aggregateFunctionDefinition 
= (AggregateFunctionDefinition) functionDefinition;
                        AggregateFunction<T, ACC > aggregateFunction =
                                (AggregateFunction<T, ACC >) 
aggregateFunctionDefinition.getAggregateFunction();
-                       TypeInformation<T> typeInfo = UserFunctionsTypeHelper
+                       TypeInformation<T> typeInfo = UserDefinedFunctionHelper
                                
.getReturnTypeOfAggregateFunction(aggregateFunction);
-                       TypeInformation<ACC> accTypeInfo = 
UserFunctionsTypeHelper
+                       TypeInformation<ACC> accTypeInfo = 
UserDefinedFunctionHelper
                                
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
                        functionCatalog.registerTempSystemAggregateFunction(
                                functionName,
@@ -921,7 +921,7 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                } else if (functionDefinition instanceof 
TableFunctionDefinition) {
                        TableFunctionDefinition tableFunctionDefinition = 
(TableFunctionDefinition) functionDefinition;
                        TableFunction<T> tableFunction = (TableFunction<T>) 
tableFunctionDefinition.getTableFunction();
-                       TypeInformation<T> typeInfo = UserFunctionsTypeHelper
+                       TypeInformation<T> typeInfo = UserDefinedFunctionHelper
                                .getReturnTypeOfTableFunction(tableFunction);
 
                        functionCatalog.registerTempSystemTableFunction(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 6658fb2..be94b0d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.table.functions.TableAggregateFunctionDefinition;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedAggregateFunction;
-import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.util.Preconditions;
 
@@ -76,7 +76,7 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        public void registerTempSystemScalarFunction(String name, 
ScalarFunction function) {
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
                registerTempSystemFunction(
                        name,
                        new ScalarFunctionDefinition(name, function)
@@ -88,9 +88,9 @@ public class FunctionCatalog implements FunctionLookup {
                        TableFunction<T> function,
                        TypeInformation<T> resultType) {
                // check if class not Scala object
-               
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
                // check if class could be instantiated
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
 
                registerTempSystemFunction(
                        name,
@@ -107,9 +107,9 @@ public class FunctionCatalog implements FunctionLookup {
                        TypeInformation<T> resultType,
                        TypeInformation<ACC> accType) {
                // check if class not Scala object
-               
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
                // check if class could be instantiated
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
 
                final FunctionDefinition definition;
                if (function instanceof AggregateFunction) {
@@ -135,7 +135,7 @@ public class FunctionCatalog implements FunctionLookup {
        }
 
        public void registerTempCatalogScalarFunction(ObjectIdentifier oi, 
ScalarFunction function) {
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
                registerTempCatalogFunction(
                        oi,
                        new ScalarFunctionDefinition(oi.getObjectName(), 
function)
@@ -147,9 +147,9 @@ public class FunctionCatalog implements FunctionLookup {
                        TableFunction<T> function,
                        TypeInformation<T> resultType) {
                // check if class not Scala object
-               
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
                // check if class could be instantiated
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
 
                registerTempCatalogFunction(
                        oi,
@@ -166,9 +166,9 @@ public class FunctionCatalog implements FunctionLookup {
                        TypeInformation<T> resultType,
                        TypeInformation<ACC> accType) {
                // check if class not Scala object
-               
UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+               
UserDefinedFunctionHelper.validateNotSingleton(function.getClass());
                // check if class could be instantiated
-               
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+               
UserDefinedFunctionHelper.validateInstantiation(function.getClass());
 
                final FunctionDefinition definition;
                if (function instanceof AggregateFunction) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
similarity index 95%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
index 5ab383f..6069cb9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
@@ -19,8 +19,11 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -31,7 +34,7 @@ import java.util.Arrays;
  * {@link org.apache.flink.table.catalog.FunctionCatalog}.
  */
 @Internal
-public class UserFunctionsTypeHelper {
+public class UserDefinedFunctionHelper {
 
        /**
         * Tries to infer the TypeInformation of an AggregateFunction's 
accumulator type.
@@ -152,9 +155,6 @@ public class UserFunctionsTypeHelper {
                        throw new ValidationException(String.format(
                                "Function class %s is no proper class," +
                                        " it is either abstract, an interface, 
or a primitive type.", clazz.getCanonicalName()));
-               } else if (InstantiationUtil.isNonStaticInnerClass(clazz)) {
-                       throw new ValidationException(String.format(
-                               "The class %s is an inner class, but not 
statically accessible.", clazz.getCanonicalName()));
                }
        }
 
@@ -171,6 +171,6 @@ public class UserFunctionsTypeHelper {
                }
        }
 
-       private UserFunctionsTypeHelper() {
+       private UserDefinedFunctionHelper() {
        }
 }
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 9253ecd..fcddda9 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.delegation.{Executor, 
ExecutorFactory, Planner, Pl
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.factories.ComponentFactoryService
-import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
+import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations.{OutputConversionModifyOperation, 
QueryOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
@@ -138,7 +138,7 @@ class StreamTableEnvironmentImpl (
   }
 
   override def registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T]): Unit = {
-    val typeInfo = UserFunctionsTypeHelper
+    val typeInfo = UserDefinedFunctionHelper
       .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
     functionCatalog.registerTempSystemTableFunction(
       name,
@@ -151,9 +151,9 @@ class StreamTableEnvironmentImpl (
       name: String,
       f: AggregateFunction[T, ACC])
     : Unit = {
-    val typeInfo = UserFunctionsTypeHelper
+    val typeInfo = UserDefinedFunctionHelper
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
-    val accTypeInfo = UserFunctionsTypeHelper
+    val accTypeInfo = UserDefinedFunctionHelper
       .getAccumulatorTypeOfAggregateFunction(f, 
implicitly[TypeInformation[ACC]])
     functionCatalog.registerTempSystemAggregateFunction(
       name,
@@ -167,9 +167,9 @@ class StreamTableEnvironmentImpl (
       name: String,
       f: TableAggregateFunction[T, ACC])
     : Unit = {
-    val typeInfo = UserFunctionsTypeHelper
+    val typeInfo = UserDefinedFunctionHelper
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
-    val accTypeInfo = UserFunctionsTypeHelper
+    val accTypeInfo = UserDefinedFunctionHelper
       .getAccumulatorTypeOfAggregateFunction(f, 
implicitly[TypeInformation[ACC]])
     functionCatalog.registerTempSystemAggregateFunction(
       name,
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index 2248f8e..0db50e8 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
UserDefinedAggregateFunction, UserFunctionsTypeHelper, _}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
UserDefinedAggregateFunction, UserDefinedFunctionHelper, _}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.utils.TypeConversions
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
@@ -1119,7 +1119,7 @@ trait ImplicitExpressionConversions {
       * Calls a table function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper
+      val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
         .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
       unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, 
resultTypeInfo), params: _*)
     }
@@ -1129,10 +1129,10 @@ trait ImplicitExpressionConversions {
       (val a: UserDefinedAggregateFunction[T, ACC]) {
 
     private def createFunctionDefinition(): FunctionDefinition = {
-      val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper
+      val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
         .getReturnTypeOfAggregateFunction(a, implicitly[TypeInformation[T]])
 
-      val accTypeInfo: TypeInformation[ACC] = UserFunctionsTypeHelper.
+      val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionHelper.
         getAccumulatorTypeOfAggregateFunction(a, 
implicitly[TypeInformation[ACC]])
 
       a match {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index 7dacccf..b4aeb41 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -201,9 +201,9 @@ case class PlannerTableFunctionCall(
 
   override def validateInput(): ValidationResult = {
     // check if not Scala object
-    UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass)
+    UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass)
     // check if class could be instantiated
-    UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass)
+    UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass)
     // look for a signature that matches the input types
     val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
     val foundMethod = getUserDefinedMethod(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 4e2e0d5..aa3d48f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.{TableConfig, 
TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
 import org.apache.flink.table.expressions._
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
-import org.apache.flink.table.functions.{FunctionIdentifier, 
UserFunctionsTypeHelper}
+import org.apache.flink.table.functions.{FunctionIdentifier, 
UserDefinedFunctionHelper}
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, 
FlinkTypeFactory}
 import org.apache.flink.table.planner.delegation.PlannerContext
@@ -713,8 +713,8 @@ class FlinkRelMdHandlerTestBase {
 
   protected lazy val tableAggCall = {
     val top3 = new Top3
-    val resultTypeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(top3)
-    val accTypeInfo = 
UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(top3)
+    val resultTypeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(top3)
+    val accTypeInfo = 
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(top3)
 
     val resultDataType = 
TypeConversions.fromLegacyInfoToDataType(resultTypeInfo)
     val accDataType = TypeConversions.fromLegacyInfoToDataType(accTypeInfo)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 2b59214..c65f5c7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -963,7 +963,7 @@ class TestingTableEnvironment private(
   // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
aggregateFunction);`
   // is added into TableEnvironment
   def registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T]): Unit = {
-    val typeInfo = UserFunctionsTypeHelper
+    val typeInfo = UserDefinedFunctionHelper
       .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
     functionCatalog.registerTempSystemTableFunction(
       name,
@@ -993,9 +993,9 @@ class TestingTableEnvironment private(
   private def registerUserDefinedAggregateFunction[T: TypeInformation, ACC: 
TypeInformation](
       name: String,
       f: UserDefinedAggregateFunction[T, ACC]): Unit = {
-    val typeInfo = UserFunctionsTypeHelper
+    val typeInfo = UserDefinedFunctionHelper
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
-    val accTypeInfo = UserFunctionsTypeHelper
+    val accTypeInfo = UserDefinedFunctionHelper
       .getAccumulatorTypeOfAggregateFunction(f, 
implicitly[TypeInformation[ACC]])
     functionCatalog.registerTempSystemAggregateFunction(
       name,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3f55bf3..d6eba73 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -142,7 +142,7 @@ abstract class TableEnvImpl(
       name: String,
       function: TableFunction[T])
     : Unit = {
-    val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper
+    val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
       .getReturnTypeOfTableFunction(
         function,
         implicitly[TypeInformation[T]])
@@ -161,12 +161,12 @@ abstract class TableEnvImpl(
       name: String,
       function: UserDefinedAggregateFunction[T, ACC])
     : Unit = {
-    val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper
+    val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
       .getReturnTypeOfAggregateFunction(
         function,
         implicitly[TypeInformation[T]])
 
-    val accTypeInfo: TypeInformation[ACC] = UserFunctionsTypeHelper
+    val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionHelper
       .getAccumulatorTypeOfAggregateFunction(
       function,
       implicitly[TypeInformation[ACC]])
@@ -824,8 +824,8 @@ abstract class TableEnvImpl(
       val aggregateFunctionDefinition = 
functionDefinition.asInstanceOf[AggregateFunctionDefinition]
       val aggregateFunction = aggregateFunctionDefinition
         .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]]
-      val typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction)
-      val accTypeInfo = UserFunctionsTypeHelper
+      val typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction)
+      val accTypeInfo = UserDefinedFunctionHelper
         .getAccumulatorTypeOfAggregateFunction(aggregateFunction)
       functionCatalog.registerTempCatalogAggregateFunction(
         functionIdentifier,
@@ -836,7 +836,7 @@ abstract class TableEnvImpl(
     } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) {
       val tableFunctionDefinition = 
functionDefinition.asInstanceOf[TableFunctionDefinition]
       val tableFunction = 
tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]]
-      val typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction)
+      val typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction)
       functionCatalog.registerTempCatalogTableFunction(
         functionIdentifier,
         tableFunction,
@@ -857,8 +857,8 @@ abstract class TableEnvImpl(
       val aggregateFunctionDefinition = 
functionDefinition.asInstanceOf[AggregateFunctionDefinition]
       val aggregateFunction = aggregateFunctionDefinition
         .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]]
-      val typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction)
-      val accTypeInfo = UserFunctionsTypeHelper
+      val typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction)
+      val accTypeInfo = UserDefinedFunctionHelper
         .getAccumulatorTypeOfAggregateFunction(aggregateFunction)
       functionCatalog.registerTempSystemAggregateFunction(
         functionName,
@@ -868,7 +868,7 @@ abstract class TableEnvImpl(
     } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) {
       val tableFunctionDefinition = 
functionDefinition.asInstanceOf[TableFunctionDefinition]
       val tableFunction = 
tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]]
-      val typeInfo = 
UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction)
+      val typeInfo = 
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction)
       functionCatalog.registerTempSystemTableFunction(
         functionName,
         tableFunction,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index f2da6fc..b40053a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -299,9 +299,9 @@ case class PlannerTableFunctionCall(
 
   override def validateInput(): ValidationResult = {
     // check if not Scala object
-    UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass)
+    UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass)
     // check if class could be instantiated
-    UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass)
+    UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass)
     // look for a signature that matches the input types
     val signature = parameters.map(_.resultType)
     val foundMethod = getUserDefinedMethod(tableFunction, "eval", 
typeInfoToClass(signature))
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 0d4e597..c9924db 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions._
 import org.apache.flink.table.functions.utils.AggSqlFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper}
+import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -1420,7 +1420,7 @@ object AggregateUtil {
     val (accumulatorType, accSpecs) = {
       val accType = aggregateFunction match {
         case udagg: AggSqlFunction => udagg.accType
-        case _ => 
UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(aggregate)
+        case _ => 
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregate)
       }
 
       removeStateViewFieldsFromAccTypeInfo(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 94d2e2e..577ea7b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.api.dataview.DataView
 import org.apache.flink.table.codegen.GeneratedAggregationsFunction
 import org.apache.flink.table.functions.aggfunctions.{CountAggFunction, 
IntSumWithRetractAggFunction, LongMaxWithRetractAggFunction, 
LongMinWithRetractAggFunction}
-import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction, UserFunctionsTypeHelper}
+import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.runtime.aggregate.GeneratedAggregations
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -79,11 +79,11 @@ class HarnessTestBase extends StreamingWithStateTestBase {
 
   protected val minMaxAggregationStateType: RowTypeInfo =
     new RowTypeInfo(minMaxAggregates
-      .map(UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(_)): 
_*)
+      
.map(UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(_)): _*)
 
   protected val sumAggregationStateType: RowTypeInfo =
     new RowTypeInfo(sumAggregates
-      .map(UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(_)): 
_*)
+      
.map(UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(_)): _*)
 
   protected val minMaxFuncName = "MinMaxAggregateHelper"
   protected val sumFuncName = "SumAggregationHelper"

Reply via email to