[
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130110#comment-17130110
]
Benchao Li edited comment on FLINK-18230 at 6/10/20, 6:06 AM:
--------------------------------------------------------------
[~mzz_q] In Flink, you can do it like this:
{code:java}
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
{code}
and you can reference the doc[1] here.
[1]
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]
was (Author: libenchao):
[~mzz_q] In Flink, you can do it like this:
{{}}
{code:java}
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
{code}
and you can reference the doc[1] here.
[1]
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]
> Table API has no Functions like sparkSQL explode
> -------------------------------------------------
>
> Key: FLINK-18230
> URL: https://issues.apache.org/jira/browse/FLINK-18230
> Project: Flink
> Issue Type: Improvement
> Reporter: mzz
> Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
> .topic(TOPIC)
> .version(VERSION)
> .startFromEarliest()
> .property("bootstrap.servers", "172.16.30.207:9092")
> .property("group.id", "km_aggs_group_3")
> )
> .withFormat(
> new Json()
> .failOnMissingField(true)
> .deriveSchema()
> )
> .withSchema(new Schema()
> .field("devs", Types.STRING())
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new
> Array[Row](0).getClass,
> Types.ROW(Array("count", "eventid", "sid", "params"),
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
> Types.ROW(Array("adid", "adtype", "ecpm"),
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
> ))
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
> )
> .inAppendMode()
> .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select ip,
> ts,launchs,advs[1].`count` from aggs_test")
> tableResult.printSchema()
> streamTableEnvironment.toAppendStream[Row](tableResult).print()
--
This message was sent by Atlassian Jira
(v8.3.4#803005)