[ 
https://issues.apache.org/jira/browse/FLINK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16327:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add TableEnvironment.fromElements interfaces for usability
> ----------------------------------------------------------
>
>                 Key: FLINK-16327
>                 URL: https://issues.apache.org/jira/browse/FLINK-16327
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: Zhenghua Gao
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Interface
> {code:java}
> /** 
>    * Creates a table from a group of objects (known as its elements). The 
> schema of the table 
>    * would be inferred from the type of elements. 
>    * 
>    * @param data a group of objects. 
>    */
> Table fromElements(Collection<?> data);
> /** 
>    * Creates a table from a group of objects (known as its elements). The 
> schema of the table 
>    * would be inferred from the passed in data type. 
>    * 
>    * @param data a group of objects 
>    * @param dataType the data type of the data 
>    */
> Table fromElements(Collection<?> data, DataType dataType);
> {code}
> h1. Use Case
>  * One potential use case for Table API
> {code:java}
> @Test 
> def testUnregisteredCollectionSource1(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
>     Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   
>   tEnv.fromElements(data.asJava)
>       .as('first, 'id, 'score, 'last)
>     .where('id > 4)
>     .select('last, 'score * 2)
>     .toAppendStream[Row]
>     .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> @Test 
> def testUnregisteredCollectionSource2(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
>     Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   val dataType = DataTypes.ROW(
>     DataTypes.FIELD("first", DataTypes.STRING()),
>     DataTypes.FIELD("id", DataTypes.INT()),
>     DataTypes.FIELD("score", DataTypes.DOUBLE()),
>     DataTypes.FIELD("last", DataTypes.STRING()))
>   tEnv.fromElements(data.asJava, dataType)
>     .where('id > 4)
>     .select('last, 'score * 2)
>     .toAppendStream[Row]
>     .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> {code}
>  * One potential use case for SQL
> {code:java}
> @Test 
> def testUnregisteredCollectionSource1(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
>     Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   
>   val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
>   
>   tEnv.createTemporaryView("T", table)
>   tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
>       .toAppendStream[Row]
>       .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> @Test 
> def testUnregisteredCollectionSource2(): Unit = {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tEnv = StreamTableEnvironment.create(env)
>   StreamITCase.testResults = mutable.MutableList()
>   val data = Seq(
>     Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>   val dataType = DataTypes.ROW(
>     DataTypes.FIELD("first", DataTypes.STRING()),
>     DataTypes.FIELD("id", DataTypes.INT()),
>     DataTypes.FIELD("score", DataTypes.DOUBLE()),
>     DataTypes.FIELD("last", DataTypes.STRING()))
>   val table = tEnv.fromElements(data.asJava, dataType)
>   tEnv.createTemporaryView("T", table)
>   tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
>       .toAppendStream[Row]
>       .addSink(new StreamITCase.StringSink[Row])
>   env.execute()
> }
> {code}
> h1. The proposal
>  * data type inference
> We need to infer the data type from the data for the first interface. A 
> potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, 
> Row, etc. For the most popular in our test cases Row or scala.tuple type, we 
> could enumerate and use a recursive traversal method to get all available 
> types of underlying objects. This can solve most of the cases and improve 
> usability.
>  * proposed changes
>  ** A CollectionQueryOperation which implements QueryOperation to describe 
> the relational operation
>  ** The logical and physical RelNode for legacy planner. In the physical 
> node, we can translate the data to DataStream
>  ** The logical and physical RelNode for blink planner. In the physical node, 
> we can translate the data to Transformation
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to