[
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143282#comment-16143282
]
ASF GitHub Bot commented on FLINK-7439:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4536#discussion_r135425816
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
---
@@ -74,48 +75,102 @@ class TableSqlFunction(
object TableSqlFunction {
- /**
- * Util function to create a [[TableSqlFunction]].
- *
- * @param name function name (used by SQL parser)
- * @param udtf user-defined table function to be called
- * @param rowTypeInfo the row type information generated by the table
function
- * @param typeFactory type factory for converting Flink's between
Calcite's types
- * @param functionImpl Calcite table function schema
- * @return [[TableSqlFunction]]
- */
- def apply(
+ private[flink] def createOperandTypeInference(
name: String,
udtf: TableFunction[_],
- rowTypeInfo: TypeInformation[_],
- typeFactory: FlinkTypeFactory,
- functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
- val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
- val typeFamilies: util.List[SqlTypeFamily] = new
util.ArrayList[SqlTypeFamily]
- // derives operands' data types and type families
- functionImpl.getParameters.asScala.foreach{ o =>
- val relType: RelDataType = o.getType(typeFactory)
- argTypes.add(relType)
- typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily,
SqlTypeFamily.ANY))
+ typeFactory: FlinkTypeFactory)
+ : SqlOperandTypeInference = {
+ /**
+ * Operand type inference based on [[TableFunction]] given
information.
+ */
+ new SqlOperandTypeInference {
+ override def inferOperandTypes(
+ callBinding: SqlCallBinding,
+ returnType: RelDataType,
+ operandTypes: Array[RelDataType]): Unit = {
+
+ val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+ val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+ .getOrElse(throw new ValidationException(
+ s"Given parameters of function '$name' do not match any
signature. \n" +
+ s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+ s"Expected: ${signaturesToString(udtf, "eval")}"))
+
+ val inferredTypes = foundSignature
+ .map(TypeExtractor.getForClass(_))
+ .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
+
+ for (i <- operandTypes.indices) {
+ if (i < inferredTypes.length - 1) {
+ operandTypes(i) = inferredTypes(i)
+ } else if (null != inferredTypes.last.getComponentType) {
+ // last argument is a collection, the array type
+ operandTypes(i) = inferredTypes.last.getComponentType
+ } else {
+ operandTypes(i) = inferredTypes.last
+ }
+ }
+ }
}
- // derives whether the 'input'th parameter of a method is optional.
- val optional: Predicate[Integer] = new Predicate[Integer]() {
- def apply(input: Integer): Boolean = {
- functionImpl.getParameters.get(input).isOptional
+ }
+
+ private[flink] def createOperandTypeChecker(
--- End diff --
Can we share the methods of `SqlOperandTypeInference` and
`createOperandTypeChecker` with `ScalarSqlFunction``TableSqlFunction` and
`AggSqlFunction`, Because all of the UDX need them, and the logic of these
method are same or similar. May be we can move them into
`UserDefinedFunctionUtils`. What do you think?
> Support variable arguments for UDTF in SQL
> ------------------------------------------
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not.
> FLINK-5882 supports variable UDTF for Table API only, but missed SQL.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)