[
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Benchao Li closed FLINK-18230.
------------------------------
Resolution: Invalid
> Table API has no Functions like sparkSQL explode
> -------------------------------------------------
>
> Key: FLINK-18230
> URL: https://issues.apache.org/jira/browse/FLINK-18230
> Project: Flink
> Issue Type: Improvement
> Reporter: mzz
> Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
> .topic(TOPIC)
> .version(VERSION)
> .startFromEarliest()
> .property("bootstrap.servers", "172.16.30.207:9092")
> .property("group.id", "km_aggs_group_3")
> )
> .withFormat(
> new Json()
> .failOnMissingField(true)
> .deriveSchema()
> )
> .withSchema(new Schema()
> .field("devs", Types.STRING())
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new
> Array[Row](0).getClass,
> Types.ROW(Array("count", "eventid", "sid", "params"),
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
> Types.ROW(Array("adid", "adtype", "ecpm"),
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
> ))
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
> )
> .inAppendMode()
> .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select ip,
> ts,launchs,advs[1].`count` from aggs_test")
> tableResult.printSchema()
> streamTableEnvironment.toAppendStream[Row](tableResult).print()
--
This message was sent by Atlassian Jira
(v8.3.4#803005)