[ 
https://issues.apache.org/jira/browse/FLINK-30201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758852#comment-17758852
 ] 

Yunhong Zheng commented on FLINK-30201:
---------------------------------------

Hi, [~mooonzhang] . I think it's not a bug, you need to find a column use full 
name like 'TableName.ColumnName.SubColumnName' instead of 
'ColumnName.SubColumnName'.  In your case, you need to use 
'riskRuleEngineResultLevel2_3.data.rule_results'.

> Function "unnest" can't process nesting JSON properly
> -----------------------------------------------------
>
>                 Key: FLINK-30201
>                 URL: https://issues.apache.org/jira/browse/FLINK-30201
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0
>            Reporter: zhangyue
>            Priority: Major
>
> Here is the CREATE TABLE DDL:
> {code:java}
> riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
>         `data` ROW<\
>             `flow_id` STRING, \
>             `flow_name` STRING, \
>             `flow_version` STRING, \
>             `risk_id` BIGINT, \
>             `uid` BIGINT, \
>             `is_pass` INT, \
>             `result` INT, \
>             `country_id` INT, \
>             `business` STRING, \
>             `engine_scene_id` STRING, \
>             `flow_type` STRING, \
>             `source` STRING, \
>             `rule_results` ARRAY<ROW<`rule_id` STRING, \
>                                 `rule_name` STRING, \
>                                 `rule_type` STRING, \
>                                 `rule_type_name` STRING, \
>                                 `node_id` STRING, \
>                                 `result` INT, \
>                                 `policy_name` STRING, \
>                                 `in_path` BOOLEAN>>\
>             >,\
>         proctime as proctime()\
>         ) WITH (\
>             'connector' = 'kafka',\
>             'topic' = 'riskRuleEngineResultLevel2_3',\
>             'scan.startup.mode' = '%s',\
>             'properties.bootstrap.servers' = '%s',\
>             'properties.group.id' = '%s',\
>             'format' = 'json'\
>         ) {code}
> flink sql:
> {code:java}
> String executeSql = "select data.flow_id as 
> flow_id,t.rule_id,t.rule_name,t.rule_type,t.rule_type_name,t.node_id,t.`result`
>  from riskRuleEngineResultLevel2_3, unnest(data.rule_results) as t 
> (rule_id,rule_name,rule_type,rule_type_name,node_id,`result`,policy_name,in_path)";
>  {code}
>   when the param in "unnest" Function is "data.rule_results" which is 
> actually the right structure, the ERROR occurs as below. And when I use 
> "rule_results" instead of "data.rule_results" in "unnest" Function ,It goes 
> well. I think it is wired.
> {code:java}
> // Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 0, column 0 to line 1, column 149: Column 
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
>     at 
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, 
> column 0 to line 1, column 149: Column 'data.data' not found in table 
> 'riskRuleEngineResultLevel2_3'
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>     at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
>     at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
>     at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>     ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>     ... 28 more {code}
> if I change DDL by adding "data2" which is the same structure with "data" at 
> the same level, no matter I use "rule_results" or "data.rule_results" in 
> "unnest" FUNCTION the ERROR occurs:
> {code:java}
> riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
>         `data` ROW<\
>             `flow_id` STRING, \
>             `flow_name` STRING, \
>             `flow_version` STRING, \
>             `risk_id` BIGINT, \
>             `uid` BIGINT, \
>             `is_pass` INT, \
>             `result` INT, \
>             `country_id` INT, \
>             `business` STRING, \
>             `engine_scene_id` STRING, \
>             `flow_type` STRING, \
>             `source` STRING, \
>             `rule_results` ARRAY<ROW<`rule_id` STRING, \
>                                 `rule_name` STRING, \
>                                 `rule_type` STRING, \
>                                 `rule_type_name` STRING, \
>                                 `node_id` STRING, \
>                                 `result` INT, \
>                                 `policy_name` STRING, \
>                                 `in_path` BOOLEAN>>\
>             >,\
>         `data2` ROW<\
>             `flow_id` STRING, \
>             `flow_name` STRING, \
>             `flow_version` STRING, \
>             `risk_id` BIGINT, \
>             `uid` BIGINT, \
>             `is_pass` INT, \
>             `result` INT, \
>             `country_id` INT, \
>             `business` STRING, \
>             `engine_scene_id` STRING, \
>             `flow_type` STRING, \
>             `source` STRING, \
>             `rule_results` ARRAY<ROW<`rule_id` STRING, \
>                                 `rule_name` STRING, \
>                                 `rule_type` STRING, \
>                                 `rule_type_name` STRING, \
>                                 `node_id` STRING, \
>                                 `result` INT, \
>                                 `policy_name` STRING, \
>                                 `in_path` BOOLEAN>>\
>             >,\
>         proctime as proctime()\
>         ) WITH (\
>             'connector' = 'kafka',\
>             'topic' = 'riskRuleEngineResultLevel2_3',\
>             'scan.startup.mode' = '%s',\
>             'properties.bootstrap.servers' = '%s',\
>             'properties.group.id' = '%s',\
>             'format' = 'json'\
>         ) {code}
> ERROR when "rule_results" in "unnest":
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 146 to line 1, column 157: Column 
> 'rule_results' is ambiguous
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
>     at 
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 146 to line 1, column 157: Column 'rule_results' is ambiguous
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>     at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:467)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2921)
>     at 
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:300)
>     at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5404)
>     at 
> org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:64)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122)
>     at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:69)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>     at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:43)
>     at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>     at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:190)
>     at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:155)
>     at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
>     at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
>     at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>     ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'rule_results' is ambiguous
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>     ... 46 more{code}
> ERROR when "data.rule_results" in "unnest"
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 0, column 0 to line 1, column 149: Column 
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
>     at 
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, 
> column 0 to line 1, column 149: Column 'data.data' not found in table 
> 'riskRuleEngineResultLevel2_3'
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>     at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
>     at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
>     at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
>     at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
>     ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>     at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>     ... 28 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to