This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09bcde276f25da11d5458b3f5ae5580143f2ead9 Author: Timo Walther <[email protected]> AuthorDate: Mon Dec 9 14:38:34 2019 +0100 [hotfix][table] Rename UserFunctionsTypeHelper --- .../java/internal/StreamTableEnvironmentImpl.java | 12 ++++++------ .../table/api/internal/TableEnvironmentImpl.java | 14 +++++++------- .../flink/table/catalog/FunctionCatalog.java | 22 +++++++++++----------- ...eHelper.java => UserDefinedFunctionHelper.java} | 10 +++++----- .../internal/StreamTableEnvironmentImpl.scala | 12 ++++++------ .../org/apache/flink/table/api/expressionDsl.scala | 8 ++++---- .../flink/table/planner/expressions/call.scala | 4 ++-- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 6 +++--- .../flink/table/planner/utils/TableTestBase.scala | 6 +++--- .../flink/table/api/internal/TableEnvImpl.scala | 18 +++++++++--------- .../org/apache/flink/table/expressions/call.scala | 4 ++-- .../table/runtime/aggregate/AggregateUtil.scala | 4 ++-- .../table/runtime/harness/HarnessTestBase.scala | 6 +++--- 13 files changed, 63 insertions(+), 63 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java index 1e42070..760a934 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java @@ -53,7 +53,7 @@ import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.functions.UserFunctionsTypeHelper; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.JavaDataStreamQueryOperation; import org.apache.flink.table.operations.OutputConversionModifyOperation; @@ -152,7 +152,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple @Override public <T> void registerFunction(String name, TableFunction<T> tableFunction) { - TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction); + TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); functionCatalog.registerTempSystemTableFunction( name, @@ -163,8 +163,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple @Override public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) { - TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction); - TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper + TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction); + TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(aggregateFunction); functionCatalog.registerTempSystemAggregateFunction( @@ -177,9 +177,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple @Override public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) { - TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction( + TypeInformation<T> typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction( tableAggregateFunction); - TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper + TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(tableAggregateFunction); functionCatalog.registerTempSystemAggregateFunction( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 838edbb..920124e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -65,7 +65,7 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.TableFunctionDefinition; -import org.apache.flink.table.functions.UserFunctionsTypeHelper; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.Module; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogQueryOperation; @@ -875,9 +875,9 @@ public class TableEnvironmentImpl implements TableEnvironment { AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition; AggregateFunction<T, ACC > aggregateFunction = (AggregateFunction<T, ACC >) aggregateFunctionDefinition.getAggregateFunction(); - TypeInformation<T> typeInfo = UserFunctionsTypeHelper + TypeInformation<T> typeInfo = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(aggregateFunction); - TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper + TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(aggregateFunction); functionCatalog.registerTempCatalogAggregateFunction( @@ -888,7 +888,7 @@ public class TableEnvironmentImpl implements TableEnvironment { } else if (functionDefinition instanceof TableFunctionDefinition) { TableFunctionDefinition tableFunctionDefinition = (TableFunctionDefinition) functionDefinition; TableFunction<T> tableFunction = (TableFunction<T>) tableFunctionDefinition.getTableFunction(); - TypeInformation<T> typeInfo = UserFunctionsTypeHelper + TypeInformation<T> typeInfo = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(tableFunction); functionCatalog.registerTempCatalogTableFunction( functionIdentifier, @@ -908,9 +908,9 @@ public class TableEnvironmentImpl implements TableEnvironment { AggregateFunctionDefinition aggregateFunctionDefinition = (AggregateFunctionDefinition) functionDefinition; AggregateFunction<T, ACC > aggregateFunction = (AggregateFunction<T, ACC >) aggregateFunctionDefinition.getAggregateFunction(); - TypeInformation<T> typeInfo = UserFunctionsTypeHelper + TypeInformation<T> typeInfo = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(aggregateFunction); - TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper + TypeInformation<ACC> accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(aggregateFunction); functionCatalog.registerTempSystemAggregateFunction( functionName, @@ -921,7 +921,7 @@ public class TableEnvironmentImpl implements TableEnvironment { } else if (functionDefinition instanceof TableFunctionDefinition) { TableFunctionDefinition tableFunctionDefinition = (TableFunctionDefinition) functionDefinition; TableFunction<T> tableFunction = (TableFunction<T>) tableFunctionDefinition.getTableFunction(); - TypeInformation<T> typeInfo = UserFunctionsTypeHelper + TypeInformation<T> typeInfo = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(tableFunction); functionCatalog.registerTempSystemTableFunction( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 6658fb2..be94b0d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -37,7 +37,7 @@ import org.apache.flink.table.functions.TableAggregateFunctionDefinition; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.functions.UserDefinedAggregateFunction; -import org.apache.flink.table.functions.UserFunctionsTypeHelper; +import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.util.Preconditions; @@ -76,7 +76,7 @@ public class FunctionCatalog implements FunctionLookup { } public void registerTempSystemScalarFunction(String name, ScalarFunction function) { - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); registerTempSystemFunction( name, new ScalarFunctionDefinition(name, function) @@ -88,9 +88,9 @@ public class FunctionCatalog implements FunctionLookup { TableFunction<T> function, TypeInformation<T> resultType) { // check if class not Scala object - UserFunctionsTypeHelper.validateNotSingleton(function.getClass()); + UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); registerTempSystemFunction( name, @@ -107,9 +107,9 @@ public class FunctionCatalog implements FunctionLookup { TypeInformation<T> resultType, TypeInformation<ACC> accType) { // check if class not Scala object - UserFunctionsTypeHelper.validateNotSingleton(function.getClass()); + UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); final FunctionDefinition definition; if (function instanceof AggregateFunction) { @@ -135,7 +135,7 @@ public class FunctionCatalog implements FunctionLookup { } public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) { - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); registerTempCatalogFunction( oi, new ScalarFunctionDefinition(oi.getObjectName(), function) @@ -147,9 +147,9 @@ public class FunctionCatalog implements FunctionLookup { TableFunction<T> function, TypeInformation<T> resultType) { // check if class not Scala object - UserFunctionsTypeHelper.validateNotSingleton(function.getClass()); + UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); registerTempCatalogFunction( oi, @@ -166,9 +166,9 @@ public class FunctionCatalog implements FunctionLookup { TypeInformation<T> resultType, TypeInformation<ACC> accType) { // check if class not Scala object - UserFunctionsTypeHelper.validateNotSingleton(function.getClass()); + UserDefinedFunctionHelper.validateNotSingleton(function.getClass()); // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(function.getClass()); + UserDefinedFunctionHelper.validateInstantiation(function.getClass()); final FunctionDefinition definition; if (function instanceof AggregateFunction) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java similarity index 95% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java index 5ab383f..6069cb9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserFunctionsTypeHelper.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java @@ -19,8 +19,11 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.InstantiationUtil; @@ -31,7 +34,7 @@ import java.util.Arrays; * {@link org.apache.flink.table.catalog.FunctionCatalog}. */ @Internal -public class UserFunctionsTypeHelper { +public class UserDefinedFunctionHelper { /** * Tries to infer the TypeInformation of an AggregateFunction's accumulator type. @@ -152,9 +155,6 @@ public class UserFunctionsTypeHelper { throw new ValidationException(String.format( "Function class %s is no proper class," + " it is either abstract, an interface, or a primitive type.", clazz.getCanonicalName())); - } else if (InstantiationUtil.isNonStaticInnerClass(clazz)) { - throw new ValidationException(String.format( - "The class %s is an inner class, but not statically accessible.", clazz.getCanonicalName())); } } @@ -171,6 +171,6 @@ public class UserFunctionsTypeHelper { } } - private UserFunctionsTypeHelper() { + private UserDefinedFunctionHelper() { } } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala index 9253ecd..fcddda9 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala @@ -34,7 +34,7 @@ import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner, Pl import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.factories.ComponentFactoryService -import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper} +import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper} import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.operations.{OutputConversionModifyOperation, QueryOperation, ScalaDataStreamQueryOperation} import org.apache.flink.table.sources.{TableSource, TableSourceValidation} @@ -138,7 +138,7 @@ class StreamTableEnvironmentImpl ( } override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { - val typeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]]) functionCatalog.registerTempSystemTableFunction( name, @@ -151,9 +151,9 @@ class StreamTableEnvironmentImpl ( name: String, f: AggregateFunction[T, ACC]) : Unit = { - val typeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]]) - val accTypeInfo = UserFunctionsTypeHelper + val accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]]) functionCatalog.registerTempSystemAggregateFunction( name, @@ -167,9 +167,9 @@ class StreamTableEnvironmentImpl ( name: String, f: TableAggregateFunction[T, ACC]) : Unit = { - val typeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]]) - val accTypeInfo = UserFunctionsTypeHelper + val accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]]) functionCatalog.registerTempSystemAggregateFunction( name, diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala index 2248f8e..0db50e8 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.expressions._ import org.apache.flink.table.expressions.utils.ApiExpressionUtils._ import org.apache.flink.table.functions.BuiltInFunctionDefinitions._ -import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper, _} +import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper, _} import org.apache.flink.table.types.DataType import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType @@ -1119,7 +1119,7 @@ trait ImplicitExpressionConversions { * Calls a table function for the given parameters. */ def apply(params: Expression*): Expression = { - val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper + val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]]) unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), params: _*) } @@ -1129,10 +1129,10 @@ trait ImplicitExpressionConversions { (val a: UserDefinedAggregateFunction[T, ACC]) { private def createFunctionDefinition(): FunctionDefinition = { - val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper + val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(a, implicitly[TypeInformation[T]]) - val accTypeInfo: TypeInformation[ACC] = UserFunctionsTypeHelper. + val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionHelper. getAccumulatorTypeOfAggregateFunction(a, implicitly[TypeInformation[ACC]]) a match { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala index 7dacccf..b4aeb41 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala @@ -201,9 +201,9 @@ case class PlannerTableFunctionCall( override def validateInput(): ValidationResult = { // check if not Scala object - UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass) + UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass) // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass) + UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass) // look for a signature that matches the input types val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType) val foundMethod = getUserDefinedMethod( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 4e2e0d5..aa3d48f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} import org.apache.flink.table.expressions._ import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis -import org.apache.flink.table.functions.{FunctionIdentifier, UserFunctionsTypeHelper} +import org.apache.flink.table.functions.{FunctionIdentifier, UserDefinedFunctionHelper} import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.planner.delegation.PlannerContext @@ -713,8 +713,8 @@ class FlinkRelMdHandlerTestBase { protected lazy val tableAggCall = { val top3 = new Top3 - val resultTypeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(top3) - val accTypeInfo = UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(top3) + val resultTypeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(top3) + val accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(top3) val resultDataType = TypeConversions.fromLegacyInfoToDataType(resultTypeInfo) val accDataType = TypeConversions.fromLegacyInfoToDataType(accTypeInfo) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 2b59214..c65f5c7 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -963,7 +963,7 @@ class TestingTableEnvironment private( // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);` // is added into TableEnvironment def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { - val typeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]]) functionCatalog.registerTempSystemTableFunction( name, @@ -993,9 +993,9 @@ class TestingTableEnvironment private( private def registerUserDefinedAggregateFunction[T: TypeInformation, ACC: TypeInformation]( name: String, f: UserDefinedAggregateFunction[T, ACC]): Unit = { - val typeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]]) - val accTypeInfo = UserFunctionsTypeHelper + val accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]]) functionCatalog.registerTempSystemAggregateFunction( name, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 3f55bf3..d6eba73 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -142,7 +142,7 @@ abstract class TableEnvImpl( name: String, function: TableFunction[T]) : Unit = { - val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper + val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper .getReturnTypeOfTableFunction( function, implicitly[TypeInformation[T]]) @@ -161,12 +161,12 @@ abstract class TableEnvImpl( name: String, function: UserDefinedAggregateFunction[T, ACC]) : Unit = { - val resultTypeInfo: TypeInformation[T] = UserFunctionsTypeHelper + val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper .getReturnTypeOfAggregateFunction( function, implicitly[TypeInformation[T]]) - val accTypeInfo: TypeInformation[ACC] = UserFunctionsTypeHelper + val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction( function, implicitly[TypeInformation[ACC]]) @@ -824,8 +824,8 @@ abstract class TableEnvImpl( val aggregateFunctionDefinition = functionDefinition.asInstanceOf[AggregateFunctionDefinition] val aggregateFunction = aggregateFunctionDefinition .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]] - val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction) - val accTypeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction) + val accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(aggregateFunction) functionCatalog.registerTempCatalogAggregateFunction( functionIdentifier, @@ -836,7 +836,7 @@ abstract class TableEnvImpl( } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) { val tableFunctionDefinition = functionDefinition.asInstanceOf[TableFunctionDefinition] val tableFunction = tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]] - val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction) + val typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction) functionCatalog.registerTempCatalogTableFunction( functionIdentifier, tableFunction, @@ -857,8 +857,8 @@ abstract class TableEnvImpl( val aggregateFunctionDefinition = functionDefinition.asInstanceOf[AggregateFunctionDefinition] val aggregateFunction = aggregateFunctionDefinition .getAggregateFunction.asInstanceOf[AggregateFunction[T, ACC]] - val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfAggregateFunction(aggregateFunction) - val accTypeInfo = UserFunctionsTypeHelper + val typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction) + val accTypeInfo = UserDefinedFunctionHelper .getAccumulatorTypeOfAggregateFunction(aggregateFunction) functionCatalog.registerTempSystemAggregateFunction( functionName, @@ -868,7 +868,7 @@ abstract class TableEnvImpl( } else if (functionDefinition.isInstanceOf[TableFunctionDefinition]) { val tableFunctionDefinition = functionDefinition.asInstanceOf[TableFunctionDefinition] val tableFunction = tableFunctionDefinition.getTableFunction.asInstanceOf[TableFunction[T]] - val typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction) + val typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction) functionCatalog.registerTempSystemTableFunction( functionName, tableFunction, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala index f2da6fc..b40053a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -299,9 +299,9 @@ case class PlannerTableFunctionCall( override def validateInput(): ValidationResult = { // check if not Scala object - UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass) + UserDefinedFunctionHelper.validateNotSingleton(tableFunction.getClass) // check if class could be instantiated - UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass) + UserDefinedFunctionHelper.validateInstantiation(tableFunction.getClass) // look for a signature that matches the input types val signature = parameters.map(_.resultType) val foundMethod = getUserDefinedMethod(tableFunction, "eval", typeInfoToClass(signature)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 0d4e597..c9924db 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -43,7 +43,7 @@ import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ import org.apache.flink.table.functions.utils.AggSqlFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ -import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper} +import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper} import org.apache.flink.table.plan.logical._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -1420,7 +1420,7 @@ object AggregateUtil { val (accumulatorType, accSpecs) = { val accType = aggregateFunction match { case udagg: AggSqlFunction => udagg.accType - case _ => UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(aggregate) + case _ => UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregate) } removeStateViewFieldsFromAccTypeInfo( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala index 94d2e2e..577ea7b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -36,7 +36,7 @@ import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.api.dataview.DataView import org.apache.flink.table.codegen.GeneratedAggregationsFunction import org.apache.flink.table.functions.aggfunctions.{CountAggFunction, IntSumWithRetractAggFunction, LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} -import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction, UserFunctionsTypeHelper} +import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction, UserDefinedFunctionHelper} import org.apache.flink.table.runtime.aggregate.GeneratedAggregations import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -79,11 +79,11 @@ class HarnessTestBase extends StreamingWithStateTestBase { protected val minMaxAggregationStateType: RowTypeInfo = new RowTypeInfo(minMaxAggregates - .map(UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(_)): _*) + .map(UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(_)): _*) protected val sumAggregationStateType: RowTypeInfo = new RowTypeInfo(sumAggregates - .map(UserFunctionsTypeHelper.getAccumulatorTypeOfAggregateFunction(_)): _*) + .map(UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(_)): _*) protected val minMaxFuncName = "MinMaxAggregateHelper" protected val sumFuncName = "SumAggregationHelper"
