[
https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-22082:
-----------------------------------
Labels: pull-request-available (was: )
> Nested projection push down doesn't work for data such as row(array(row))
> -------------------------------------------------------------------------
>
> Key: FLINK-22082
> URL: https://issues.apache.org/jira/browse/FLINK-22082
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.0, 1.13.0
> Reporter: Dian Fu
> Assignee: Shengkai Fang
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.13.0, 1.12.3
>
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment
> config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, config)
> source_ddl = """
> CREATE TABLE InTable (
> `ID` STRING,
> `Timestamp` TIMESTAMP(3),
> `Result` ROW(
> `data` ROW(`value` BIGINT) ARRAY),
> WATERMARK FOR `Timestamp` AS `Timestamp`
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'json',
> 'path' = '/tmp/1.txt'
> )
> """
> sink_ddl = """
> CREATE TABLE OutTable (
> `ID` STRING,
> `value` BIGINT
> ) WITH (
> 'connector' = 'print'
> )
> """
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> table = t_env.from_path('InTable')
> table \
> .select(
> table.ID,
> table.Result.data.at(1).value) \
> .execute_insert('OutTable') \
> .wait()
> {code}
> It will thrown the following exception:
> {code}
> : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
> at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
> {code}
> See
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
> for more details
--
This message was sent by Atlassian Jira
(v8.3.4#803005)