[FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples
This closes #2274. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123c637e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123c637e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123c637e Branch: refs/heads/master Commit: 123c637e804bfdd6569051cf705ec73b5cb95352 Parents: 45df1b2 Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Thu Jul 21 00:31:01 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Aug 2 16:24:59 2016 +0200 ---------------------------------------------------------------------- .../flink/examples/java/JavaSQLExample.java | 72 ++++++++++++++++++++ .../flink/examples/scala/StreamSQLExample.scala | 62 +++++++++++++++++ .../examples/scala/StreamTableExample.scala | 58 ++++++++++++++++ .../flink/examples/scala/WordCountSQL.scala | 43 ++++++++++++ 4 files changed, 235 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java new file mode 100644 index 0000000..bbac94a --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.java; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.table.TableEnvironment; + +/** + * Simple example that shows how the Batch SQL used in Java. + */ +public class JavaSQLExample { + + public static class WC { + public String word; + public long frequency; + + // Public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, long frequency) { + this.word = word; + this.frequency = frequency; + } + + @Override + public String toString() { + return "WC " + word + " " + frequency; + } + } + + public static void main(String[] args) throws Exception { + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + // register the DataSet as table "WordCount" + tableEnv.registerDataSet("WordCount", input, "word, frequency"); + // run a SQL query on the Table and retrieve the result as a new Table + Table table = tableEnv.sql( + "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); + + DataSet<WC> result = tableEnv.toDataSet(table, WC.class); + + result.print(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala new file mode 100644 index 0000000..8eed77d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of SQL on Stream Table. + */ +object StreamSQLExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA: DataStream[Order] = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))) + + val orderB: DataStream[Order] = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))) + + // register the DataStream under the name "OrderA" and "OrderB" + tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) + tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) + + // Union two tables + val result = tEnv.sql( + "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " + + "SELECT STREAM * FROM OrderB WHERE amount < 2") + + result.toDataStream[Order].print() + + env.execute() + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala new file mode 100644 index 0000000..9081f50 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of Table API on Stream Table. + */ +object StreamTableExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))).toTable(tEnv) + + val orderB = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))).toTable(tEnv) + + val result: DataStream[Order] = orderA.unionAll(orderB) + .select('user, 'product, 'amount) + .where('amount > 2) + .toDataStream[Order] + + result.print() + + env.execute() + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala new file mode 100644 index 0000000..41efffc --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment + +/** + * Simple example that shows how the Batch SQL used in Scala. + */ +object WordCountSQL { + case class WC(word: String, count: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + tEnv.registerDataSet("WordCount", input, 'word, 'frequency) + + val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") + + table.toDataSet[WC].print() + } +}