[
https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312880#comment-17312880
]
Dian Fu commented on FLINK-22082:
---------------------------------
Does kafka source support nested projection pushdown? Actually it uses Kafka
source in the original job:
https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
I just changed it to filesystem for ease of validation(as FileSystemTableSource
is bundled in the blink planner). For the exception itself, it seems that it
has nothing to do with what the source is.
> Nested projection push down doesn't work for data such as row(array(row))
> -------------------------------------------------------------------------
>
> Key: FLINK-22082
> URL: https://issues.apache.org/jira/browse/FLINK-22082
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.0, 1.13.0
> Reporter: Dian Fu
> Priority: Major
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment
> config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, config)
> source_ddl = """
> CREATE TABLE InTable (
> `ID` STRING,
> `Timestamp` TIMESTAMP(3),
> `Result` ROW(
> `data` ROW(`value` BIGINT) ARRAY),
> WATERMARK FOR `Timestamp` AS `Timestamp`
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'json',
> 'path' = '/tmp/1.txt'
> )
> """
> sink_ddl = """
> CREATE TABLE OutTable (
> `ID` STRING,
> `value` BIGINT
> ) WITH (
> 'connector' = 'print'
> )
> """
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> table = t_env.from_path('InTable')
> table \
> .select(
> table.ID,
> table.Result.data.at(1).value) \
> .execute_insert('OutTable') \
> .wait()
> {code}
> It will thrown the following exception:
> {code}
> : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
> at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
> {code}
> See
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
> for more details
--
This message was sent by Atlassian Jira
(v8.3.4#803005)