[
https://issues.apache.org/jira/browse/FLINK-28830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575619#comment-17575619
]
jackylau edited comment on FLINK-28830 at 8/5/22 7:46 AM:
----------------------------------------------------------
hi [~twalthr]
val functionResultType = FlinkTypeFactory.toLogicalRowType(rexCall.getType)
cause the function return type rowtype, and the code gen below cause it not
translate from long to row data
{code:java}
// code placeholder
// code for wrapping atomic types
val collectorCode = if (!isCompositeType(outputType)) {
val resultGenerator = new ExprCodeGenerator(collectorCtx,
outputType.isNullable)
.bindInput(outputType, externalResultTerm)
val wrappedResult = resultGenerator.generateConverterResultExpression(
returnType.asInstanceOf[RowType],
classOf[GenericRowData])
s"""
|${wrappedResult.code}
|outputResult(${wrappedResult.resultTerm});
|""".stripMargin
} else {
s"""
|if ($externalResultTerm != null) {
| outputResult($externalResultTerm);
|}
|""".stripMargin
} {code}
was (Author: jackylau):
hi [~twalthr]
> new stack udtf doesn't support atomic type
> -------------------------------------------
>
> Key: FLINK-28830
> URL: https://issues.apache.org/jira/browse/FLINK-28830
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.16.0
> Reporter: jackylau
> Priority: Major
> Fix For: 1.16.0
>
>
> {code:java}
> // code placeholder
> public class GenerateSeriesFunction extends BuiltInTableFunction<Long> {
> private static final long serialVersionUID = 1L;
> public GenerateSeriesFunction(SpecializedContext specializedContext) {
> super(BuiltInFunctionDefinitions.GENERATE_SERIES, specializedContext);
> }
> public void eval(long start, long stop) {
> eval(start, stop, 1);
> }
>
> public void eval(long start, long stop, long step) {
> long s = start;
> while (s <= stop) {
> collect(s);
> s += step;
> }
> }
> }
> public static final BuiltInFunctionDefinition GENERATE_SERIES =
> BuiltInFunctionDefinition.newBuilder()
> .name("GENERATE_SERIES")
> .kind(TABLE)
> .inputTypeStrategy(
> or(
> sequence(
> logical(LogicalTypeFamily.NUMERIC),
> logical(LogicalTypeFamily.NUMERIC)),
> sequence(
> logical(LogicalTypeFamily.NUMERIC),
> logical(LogicalTypeFamily.NUMERIC),
> logical(LogicalTypeFamily.NUMERIC))))
> .outputTypeStrategy(explicit(DataTypes.BIGINT()))
> .runtimeClass(
>
> "org.apache.flink.table.runtime.functions.table.GenerateSeriesFunction")
> .build(); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)