[ 
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130110#comment-17130110
 ] 

Benchao Li edited comment on FLINK-18230 at 6/10/20, 6:06 AM:
--------------------------------------------------------------

[~mzz_q] In Flink, you can do it like this:
{code:java}
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) 
{code}
and you can reference the doc[1] here.

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]


was (Author: libenchao):
[~mzz_q] In Flink, you can do it like this:

{{}}
{code:java}
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) 
{code}
and you can reference the doc[1] here.

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]

> Table API has no Functions like  sparkSQL explode
> -------------------------------------------------
>
>                 Key: FLINK-18230
>                 URL: https://issues.apache.org/jira/browse/FLINK-18230
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: mzz
>            Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
>       .topic(TOPIC)
>       .version(VERSION)
>       .startFromEarliest()
>       .property("bootstrap.servers", "172.16.30.207:9092")
>       .property("group.id", "km_aggs_group_3")
>     )
>       .withFormat(
>         new Json()
>           .failOnMissingField(true)
>           .deriveSchema()
>       )
>       .withSchema(new Schema()
>         .field("devs", Types.STRING())
>         .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass,
>             Types.ROW(Array("count", "eventid", "sid", "params"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
>               Types.ROW(Array("adid", "adtype", "ecpm"), 
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
>       ))
>         .field("identity", Types.STRING())
>         .field("ip", Types.STRING())
>         .field("launchs", Types.STRING())
>         .field("ts", Types.STRING())
>       )
>       .inAppendMode()
>       .registerTableSource("aggs_test")
>     val tableResult = streamTableEnvironment.sqlQuery("select ip, 
> ts,launchs,advs[1].`count` from aggs_test")
>     tableResult.printSchema()
>     streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to