sandy du created FLINK-22541: -------------------------------- Summary: add json format filter params Key: FLINK-22541 URL: https://issues.apache.org/jira/browse/FLINK-22541 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.12.0, 1.11.0 Reporter: sandy du
In my case,one kafka topic multiple table data,for example: {"id":"121","source":"users","content":\{"name":"test01","age":20,"addr":"addr1"}} {"id":"122","source":"users","content":\{"name":"test02","age":23,"addr":"addr2"}} {"id":"124","source":"users","content":\{"name":"test03","age":34,"addr":"addr3"}} {"id":"124","source":"order","content":\{"orderId":"100001","price":34,"addr":"addr1231"}} {"id":"125","source":"order","content":\{"orderId":"100002","price":34,"addr":"addr1232"}} {"id":"126","source":"order","content":\{"orderId":"100003","price":34,"addr":"addr1233"}} I just want to consume data from talbe order,flink sql ddl like this: CREATE TABLE order ( orderId STRING, age INT, addr STRING ) with ( 'connector'='kafka', 'topic'='kafkatopic', 'properties.bootstrap.servers'='localhost:9092', 'properties.group.id'='testGroup', 'scan.startup.mode'='earliest-offset', 'format'='json', 'path-fliter'='$[?(@.source=="order")]', 'path-data'='$.content' ); path-fliter and path-data can use JsonPath (https://github.com/json-path/JsonPath) -- This message was sent by Atlassian Jira (v8.3.4#803005)