[ 
https://issues.apache.org/jira/browse/FLINK-23671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395717#comment-17395717
 ] 

Shengkai Fang commented on FLINK-23671:
---------------------------------------

[~twalthr] I have updated the description.

> Failed to inference type in correlate 
> --------------------------------------
>
>                 Key: FLINK-23671
>                 URL: https://issues.apache.org/jira/browse/FLINK-23671
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Shengkai Fang
>            Priority: Major
>
> {code:java}
> CREATE FUNCTION func111 AS 
> 'org.apache.flink.table.client.gateway.utils.CPDetailOriginMatchV2UDF';
> CREATE TABLE side(
>   `id2` VARCHAR,
>   PRIMARY KEY (`id2`) NOT ENFORCED
> ) WITH (
>   'connector' = 'values'
> );
> CREATE TABLE main(
>   `id` VARCHAR,
>   `proctime` as proctime()
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '10'
> );
> CREATE TABLE blackhole(
>   `id` VARCHAR
> ) WITH (
>   'connector' = 'blackhole'
> );
> INSERT INTO blackhole
> SELECT `id`
> FROM main
> JOIN side FOR SYSTEM_TIME AS OF main.`proctime` ON main.`id` = side.`id2`
> INNER join lateral table(func111(side.`id2`)) as T(`is_match`, 
> `match_bizline`, `match_page_id`, `source_type`) ON 1 = 1;
> {code}
> The implementation of the udf is as follow 
> {code:java}
> package org.apache.flink.table.client.gateway.utils;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.catalog.DataTypeFactory;
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.table.types.inference.TypeInference;
> import org.apache.flink.types.Row;
> import java.util.Optional;
> public class CPDetailOriginMatchV2UDF extends TableFunction<Row> {
>     public void eval(String original) {
>         collect(null);
>     }
>     // is_matched, match_bizline, match_page_id, scene
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(
>                         callContext -> {
>                             DataType[] array = new DataType[4];
>                             array[0] = DataTypes.BOOLEAN();
>                             array[1] = DataTypes.STRING();
>                             // page_id 是Long类型, BIGINT 是否可以支持?
>                             array[2] = DataTypes.BIGINT();
>                             array[3] = DataTypes.STRING();
>                             return Optional.of(DataTypes.ROW(array));
>                         })
>                 .build();
>     }
> }
> {code}
> The exception stack as follows.
> {code:java}
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'STRING NOT NULL' and actual argument type 
> 'STRING'.
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:323)
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:320)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyArgumentTypes(BridgingFunctionGenUtil.scala:320)
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:95)
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:65)
>       at 
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:73)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:861)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:537)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:57)
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157)
>       at 
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateOperator(CorrelateCodeGenerator.scala:127)
>       at 
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateCorrelateTransformation(CorrelateCodeGenerator.scala:75)
>       at 
> org.apache.flink.table.planner.codegen.CorrelateCodeGenerator.generateCorrelateTransformation(CorrelateCodeGenerator.scala)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate.translateToPlanInternal(CommonExecCorrelate.java:102)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:210)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:289)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.lambda$translateInputToPlan$5(ExecNodeBase.java:244)
>       
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to