twalthr commented on a change in pull request #11034: [FLINK-15802][table]
Support new type inference for table functions
URL: https://github.com/apache/flink/pull/11034#discussion_r378232832
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
##########
@@ -60,7 +82,61 @@ class TableFunctionCallGen(tableFunction: TableFunction[_])
extends CallGenerato
operands: Seq[GeneratedExpression],
func: TableFunction[_]): Array[GeneratedExpression] = {
// get the expanded parameter types
- var paramClasses = getEvalMethodSignature(func,
operands.map(_.resultType).toArray)
+ val paramClasses = getEvalMethodSignature(func,
operands.map(_.resultType).toArray)
prepareFunctionArgs(ctx, operands, paramClasses,
func.getParameterTypes(paramClasses))
}
+
+ def getExternalDataType: DataType = {
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ val arguments =
UserDefinedFunctionUtils.transformRexNodes(rexCall.operands)
+ val operandTypes = rexCall.operands
+ .map(_.getType)
+ .map(FlinkTypeFactory.toLogicalType).toArray
+ val func = sqlFunction.makeFunction(arguments, operandTypes)
+ val argTypes = getEvalMethodSignature(
+ func,
+ rexCall.operands
+ .map(_.getType)
+ .map(FlinkTypeFactory.toLogicalType).toArray)
+ sqlFunction
+ .getFunction
+ .asInstanceOf[FlinkTableFunction]
+ .getExternalResultType(func, arguments, argTypes)
+ }
+
+ /**
+ * Generates a collector that converts the output of a table function
(possibly as an atomic type)
+ * into an internal row type. Returns a collector term for referencing the
collector.
+ */
+ def generateResultCollector(ctx: CodeGeneratorContext): String = {
+ val externalDataType = getExternalDataType
+ val pojoFieldMapping =
Some(UserDefinedFunctionUtils.getFieldInfo(externalDataType)._2)
+ val externalType = fromDataTypeToLogicalType(externalDataType)
+ val wrappedInternalType = PlannerTypeUtils.toRowType(externalType)
+
+ val collectorCtx = CodeGeneratorContext(ctx.tableConfig)
+ val externalTerm = newName("externalRecord")
+ val resultGenerator = new ExprCodeGenerator(collectorCtx, true)
+ .bindInput(externalType, externalTerm, pojoFieldMapping)
+ val wrappedResult = resultGenerator.generateConverterResultExpression(
Review comment:
good point, I changed the logic again. So we will skip the conversion for
structured types. This also means that we have different handling for
`collect(null)`. I think we should skip emitting in those cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services