[ 
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li closed FLINK-18230.
------------------------------
    Resolution: Invalid

> Table API has no Functions like  sparkSQL explode
> -------------------------------------------------
>
>                 Key: FLINK-18230
>                 URL: https://issues.apache.org/jira/browse/FLINK-18230
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: mzz
>            Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
>       .topic(TOPIC)
>       .version(VERSION)
>       .startFromEarliest()
>       .property("bootstrap.servers", "172.16.30.207:9092")
>       .property("group.id", "km_aggs_group_3")
>     )
>       .withFormat(
>         new Json()
>           .failOnMissingField(true)
>           .deriveSchema()
>       )
>       .withSchema(new Schema()
>         .field("devs", Types.STRING())
>         .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass,
>             Types.ROW(Array("count", "eventid", "sid", "params"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
>               Types.ROW(Array("adid", "adtype", "ecpm"), 
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
>       ))
>         .field("identity", Types.STRING())
>         .field("ip", Types.STRING())
>         .field("launchs", Types.STRING())
>         .field("ts", Types.STRING())
>       )
>       .inAppendMode()
>       .registerTableSource("aggs_test")
>     val tableResult = streamTableEnvironment.sqlQuery("select ip, 
> ts,launchs,advs[1].`count` from aggs_test")
>     tableResult.printSchema()
>     streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to