[ 
https://issues.apache.org/jira/browse/FLINK-28830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575619#comment-17575619
 ] 

jackylau edited comment on FLINK-28830 at 8/5/22 7:46 AM:
----------------------------------------------------------

hi [~twalthr] 

val functionResultType = FlinkTypeFactory.toLogicalRowType(rexCall.getType) 
cause the function return type rowtype, and the code gen below cause it not 
translate from long to row data
{code:java}
// code placeholder
// code for wrapping atomic types
val collectorCode = if (!isCompositeType(outputType)) {
  val resultGenerator = new ExprCodeGenerator(collectorCtx, 
outputType.isNullable)
    .bindInput(outputType, externalResultTerm)
  val wrappedResult = resultGenerator.generateConverterResultExpression(
    returnType.asInstanceOf[RowType],
    classOf[GenericRowData])
  s"""
     |${wrappedResult.code}
     |outputResult(${wrappedResult.resultTerm});
     |""".stripMargin
} else {
  s"""
     |if ($externalResultTerm != null) {
     |  outputResult($externalResultTerm);
     |}
     |""".stripMargin
} {code}


was (Author: jackylau):
hi [~twalthr] 

> new stack udtf doesn't support atomic type 
> -------------------------------------------
>
>                 Key: FLINK-28830
>                 URL: https://issues.apache.org/jira/browse/FLINK-28830
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0
>            Reporter: jackylau
>            Priority: Major
>             Fix For: 1.16.0
>
>
> {code:java}
> // code placeholder
> public class GenerateSeriesFunction extends BuiltInTableFunction<Long> {
>     private static final long serialVersionUID = 1L;
>     public GenerateSeriesFunction(SpecializedContext specializedContext) {
>         super(BuiltInFunctionDefinitions.GENERATE_SERIES, specializedContext);
>     }
>     public void eval(long start, long stop) {
>         eval(start, stop, 1);
>     }
>     
>     public void eval(long start, long stop, long step) {
>         long s = start;
>         while (s <= stop) {
>             collect(s);
>             s += step;
>         }
>     }
> }
> public static final BuiltInFunctionDefinition GENERATE_SERIES =
>         BuiltInFunctionDefinition.newBuilder()
>                 .name("GENERATE_SERIES")
>                 .kind(TABLE)
>                 .inputTypeStrategy(
>                         or(
>                                 sequence(
>                                         logical(LogicalTypeFamily.NUMERIC),
>                                         logical(LogicalTypeFamily.NUMERIC)),
>                                 sequence(
>                                         logical(LogicalTypeFamily.NUMERIC),
>                                         logical(LogicalTypeFamily.NUMERIC),
>                                         logical(LogicalTypeFamily.NUMERIC))))
>                 .outputTypeStrategy(explicit(DataTypes.BIGINT()))
>                 .runtimeClass(
>                         
> "org.apache.flink.table.runtime.functions.table.GenerateSeriesFunction")
>                 .build(); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to