Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3623#discussion_r108319900
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
    @@ -27,52 +27,73 @@ import 
org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
     import org.apache.flink.api.common.typeutils.CompositeType
     import org.apache.flink.table.api.TableException
     import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
    +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF}
     
     /**
       * This is heavily inspired by Calcite's 
[[org.apache.calcite.schema.impl.TableFunctionImpl]].
       * We need it in order to create a 
[[org.apache.flink.table.functions.utils.TableSqlFunction]].
       * The main difference is that we override the [[getRowType()]] and 
[[getElementType()]].
    +  *
    +  * @param tableFunction The Table Function instance
    +  * @param implicitResultType The implicit result type.
    +  * @param evalMethod The eval() method of the [[tableFunction]]
       */
    -class FlinkTableFunctionImpl[T](
    -    val typeInfo: TypeInformation[T],
    -    val fieldIndexes: Array[Int],
    -    val fieldNames: Array[String],
    -    val evalMethod: Method)
    +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_],
    +                                val implicitResultType: TypeInformation[_],
    +                                val evalMethod: Method)
       extends ReflectiveFunctionBase(evalMethod)
       with TableFunction {
     
    -  if (fieldIndexes.length != fieldNames.length) {
    -    throw new TableException(
    -      "Number of field indexes and field names must be equal.")
    -  }
    +  override def getElementType(arguments: util.List[AnyRef]): Type = 
classOf[Array[Object]]
     
    -  // check uniqueness of field names
    -  if (fieldNames.length != fieldNames.toSet.size) {
    -    throw new TableException(
    -      "Table field names must be unique.")
    +  override def getRowType(typeFactory: RelDataTypeFactory,
    +                          arguments: util.List[AnyRef]): RelDataType = {
    +
    +    // Get the result type from table function. If it is not null, the 
implicitResultType may
    +    // already be generated by Table API's apply() method.
    +    val resultType = if (tableFunction.getResultType(arguments) != null) {
    +      tableFunction.getResultType(arguments)
    +    } else {
    +      implicitResultType
    +    }
    +    val (fieldNames, fieldIndexes, _) = 
UserDefinedFunctionUtils.getFieldInfo(resultType)
    +    buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes)
       }
     
    -  val fieldTypes: Array[TypeInformation[_]] =
    -    typeInfo match {
    -      case cType: CompositeType[T] =>
    -        if (fieldNames.length != cType.getArity) {
    -          throw new TableException(
    -            s"Arity of type (" + cType.getFieldNames.deep + ") " +
    -              "not equal to number of field names " + fieldNames.deep + 
".")
    -        }
    -        
fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
    -      case aType: AtomicType[T] =>
    -        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
    -          throw new TableException(
    -            "Non-composite input type may have only a single field and its 
index must be 0.")
    -        }
    -        Array(aType)
    +  private [table] def buildRelDataType(typeFactory: RelDataTypeFactory,
    --- End diff --
    
    I think this method can go into `UserDefinedFunctionUtils`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to