[
https://issues.apache.org/jira/browse/FLINK-23671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther reassigned FLINK-23671:
------------------------------------
Assignee: Timo Walther
> Failed to inference type in correlate
> --------------------------------------
>
> Key: FLINK-23671
> URL: https://issues.apache.org/jira/browse/FLINK-23671
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.2
> Reporter: Shengkai Fang
> Assignee: Timo Walther
> Priority: Major
> Attachments: screenshot-1.png
>
>
> Please also turn off the assert by running the query *without* jvm parameter
> -ea .
> !screenshot-1.png!
> {code:java}
> CREATE FUNCTION func111 AS
> 'org.apache.flink.table.client.gateway.utils.CPDetailOriginMatchV2UDF';
> CREATE TABLE side(
> `id2` VARCHAR,
> PRIMARY KEY (`id2`) NOT ENFORCED
> ) WITH (
> 'connector' = 'values'
> );
> CREATE TABLE main(
> `id` VARCHAR,
> `proctime` as proctime()
> ) WITH (
> 'connector' = 'datagen',
> 'number-of-rows' = '10'
> );
> CREATE TABLE blackhole(
> `id` VARCHAR
> ) WITH (
> 'connector' = 'blackhole'
> );
> INSERT INTO blackhole
> SELECT `id`
> FROM main
> LEFT JOIN side FOR SYSTEM_TIME AS OF main.`proctime` ON main.`id` = side.`id2`
> INNER join lateral table(func111(side.`id2`)) as T(`is_match`,
> `match_bizline`, `match_page_id`, `source_type`) ON 1 = 1;
> {code}
> The implementation of the udf is as follow
> {code:java}
> package org.apache.flink.table.client.gateway.utils;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.catalog.DataTypeFactory;
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.table.types.inference.TypeInference;
> import org.apache.flink.types.Row;
> import java.util.Optional;
> public class CPDetailOriginMatchV2UDF extends TableFunction<Row> {
> public void eval(String original) {
> collect(null);
> }
> // is_matched, match_bizline, match_page_id, scene
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(
> callContext -> {
> DataType[] array = new DataType[4];
> array[0] = DataTypes.BOOLEAN();
> array[1] = DataTypes.STRING();
> // page_id 是Long类型, BIGINT 是否可以支持?
> array[2] = DataTypes.BIGINT();
> array[3] = DataTypes.STRING();
> return Optional.of(DataTypes.ROW(array));
> })
> .build();
> }
> }
> {code}
> The exception stack as follows.
> {code:java}
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of
> function's argument data type 'STRING NOT NULL' and actual argument type
> 'STRING'.
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:323)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:320)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyArgumentTypes(BridgingFunctionGenUtil.scala:320)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:95)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:65)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:73)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:861)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:537)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:57)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157)
> at
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateOperator(CorrelateCodeGenerator.scala:127)
> at
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateCorrelateTransformation(CorrelateCodeGenerator.scala:75)
> at
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator.generateCorrelateTransformation(CorrelateCodeGenerator.scala)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate.translateToPlanInternal(CommonExecCorrelate.java:102)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:210)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:289)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.lambda$translateInputToPlan$5(ExecNodeBase.java:244)
>
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)