[
https://issues.apache.org/jira/browse/FLINK-30201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758852#comment-17758852
]
Yunhong Zheng commented on FLINK-30201:
---------------------------------------
Hi, [~mooonzhang] . I think it's not a bug, you need to find a column use full
name like 'TableName.ColumnName.SubColumnName' instead of
'ColumnName.SubColumnName'. In your case, you need to use
'riskRuleEngineResultLevel2_3.data.rule_results'.
> Function "unnest" can't process nesting JSON properly
> -----------------------------------------------------
>
> Key: FLINK-30201
> URL: https://issues.apache.org/jira/browse/FLINK-30201
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0
> Reporter: zhangyue
> Priority: Major
>
> Here is the CREATE TABLE DDL:
> {code:java}
> riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
> `data` ROW<\
> `flow_id` STRING, \
> `flow_name` STRING, \
> `flow_version` STRING, \
> `risk_id` BIGINT, \
> `uid` BIGINT, \
> `is_pass` INT, \
> `result` INT, \
> `country_id` INT, \
> `business` STRING, \
> `engine_scene_id` STRING, \
> `flow_type` STRING, \
> `source` STRING, \
> `rule_results` ARRAY<ROW<`rule_id` STRING, \
> `rule_name` STRING, \
> `rule_type` STRING, \
> `rule_type_name` STRING, \
> `node_id` STRING, \
> `result` INT, \
> `policy_name` STRING, \
> `in_path` BOOLEAN>>\
> >,\
> proctime as proctime()\
> ) WITH (\
> 'connector' = 'kafka',\
> 'topic' = 'riskRuleEngineResultLevel2_3',\
> 'scan.startup.mode' = '%s',\
> 'properties.bootstrap.servers' = '%s',\
> 'properties.group.id' = '%s',\
> 'format' = 'json'\
> ) {code}
> flink sql:
> {code:java}
> String executeSql = "select data.flow_id as
> flow_id,t.rule_id,t.rule_name,t.rule_type,t.rule_type_name,t.node_id,t.`result`
> from riskRuleEngineResultLevel2_3, unnest(data.rule_results) as t
> (rule_id,rule_name,rule_type,rule_type_name,node_id,`result`,policy_name,in_path)";
> {code}
> when the param in "unnest" Function is "data.rule_results" which is
> actually the right structure, the ERROR occurs as below. And when I use
> "rule_results" instead of "data.rule_results" in "unnest" Function ,It goes
> well. I think it is wired.
> {code:java}
> // Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. From line 0, column 0 to line 1, column 149: Column
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
> at
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0,
> column 0 to line 1, column 149: Column 'data.data' not found in table
> 'riskRuleEngineResultLevel2_3'
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
> ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
> ... 28 more {code}
> if I change DDL by adding "data2" which is the same structure with "data" at
> the same level, no matter I use "rule_results" or "data.rule_results" in
> "unnest" FUNCTION the ERROR occurs:
> {code:java}
> riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\
> `data` ROW<\
> `flow_id` STRING, \
> `flow_name` STRING, \
> `flow_version` STRING, \
> `risk_id` BIGINT, \
> `uid` BIGINT, \
> `is_pass` INT, \
> `result` INT, \
> `country_id` INT, \
> `business` STRING, \
> `engine_scene_id` STRING, \
> `flow_type` STRING, \
> `source` STRING, \
> `rule_results` ARRAY<ROW<`rule_id` STRING, \
> `rule_name` STRING, \
> `rule_type` STRING, \
> `rule_type_name` STRING, \
> `node_id` STRING, \
> `result` INT, \
> `policy_name` STRING, \
> `in_path` BOOLEAN>>\
> >,\
> `data2` ROW<\
> `flow_id` STRING, \
> `flow_name` STRING, \
> `flow_version` STRING, \
> `risk_id` BIGINT, \
> `uid` BIGINT, \
> `is_pass` INT, \
> `result` INT, \
> `country_id` INT, \
> `business` STRING, \
> `engine_scene_id` STRING, \
> `flow_type` STRING, \
> `source` STRING, \
> `rule_results` ARRAY<ROW<`rule_id` STRING, \
> `rule_name` STRING, \
> `rule_type` STRING, \
> `rule_type_name` STRING, \
> `node_id` STRING, \
> `result` INT, \
> `policy_name` STRING, \
> `in_path` BOOLEAN>>\
> >,\
> proctime as proctime()\
> ) WITH (\
> 'connector' = 'kafka',\
> 'topic' = 'riskRuleEngineResultLevel2_3',\
> 'scan.startup.mode' = '%s',\
> 'properties.bootstrap.servers' = '%s',\
> 'properties.group.id' = '%s',\
> 'format' = 'json'\
> ) {code}
> ERROR when "rule_results" in "unnest":
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. From line 1, column 146 to line 1, column 157: Column
> 'rule_results' is ambiguous
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
> at
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
> column 146 to line 1, column 157: Column 'rule_results' is ambiguous
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:467)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2921)
> at
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:300)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5404)
> at
> org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:64)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122)
> at
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:69)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
> at
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:43)
> at
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
> at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:190)
> at
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:155)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
> ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'rule_results' is ambiguous
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
> ... 46 more{code}
> ERROR when "data.rule_results" in "unnest"
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. From line 0, column 0 to line 1, column 149: Column
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
> at
> com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0,
> column 0 to line 1, column 149: Column 'data.data' not found in table
> 'riskRuleEngineResultLevel2_3'
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
> ... 5 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'data.data' not found in table 'riskRuleEngineResultLevel2_3'
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
> ... 28 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)