[
https://issues.apache.org/jira/browse/FLINK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-22541:
----------------------------
Component/s: Table SQL / Ecosystem
> add json format filter params
> ------------------------------
>
> Key: FLINK-22541
> URL: https://issues.apache.org/jira/browse/FLINK-22541
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Ecosystem
> Affects Versions: 1.11.0, 1.12.0
> Reporter: sandy du
> Priority: Minor
>
> In my case,one kafka topic store multiple table data,for example:
>
> \{"id":"121","source":"users","content":{"name":"test01","age":20,"addr":"addr1"}}
>
> \{"id":"122","source":"users","content":{"name":"test02","age":23,"addr":"addr2"}}
>
> \{"id":"124","source":"users","content":{"name":"test03","age":34,"addr":"addr3"}}
>
> \{"id":"124","source":"order","content":{"orderId":"100001","price":34,"addr":"addr1231"}}
>
> \{"id":"125","source":"order","content":{"orderId":"100002","price":34,"addr":"addr1232"}}
>
> \{"id":"126","source":"order","content":{"orderId":"100003","price":34,"addr":"addr1233"}}
>
> I just want to consume data from talbe order,flink sql ddl like this:
> CREATE TABLE order (
> orderId STRING,
> age INT,
> addr STRING
> )
> with (
> 'connector'='kafka',
> 'topic'='kafkatopic',
> 'properties.bootstrap.servers'='localhost:9092',
> 'properties.group.id'='testGroup',
> 'scan.startup.mode'='earliest-offset',
> 'format'='json',
> 'path-fliter'='$[?(@.source=="order")]',
> 'path-data'='$.content'
> );
>
> path-fliter and path-data can use JsonPath
> ([https://github.com/json-path/JsonPath])
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)