[
https://issues.apache.org/jira/browse/FLINK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-16327:
-----------------------------------
Labels: pull-request-available (was: )
> Add TableEnvironment.fromElements interfaces for usability
> ----------------------------------------------------------
>
> Key: FLINK-16327
> URL: https://issues.apache.org/jira/browse/FLINK-16327
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.11.0
> Reporter: Zhenghua Gao
> Priority: Major
> Labels: pull-request-available
>
> h1. Interface
> {code:java}
> /**
> * Creates a table from a group of objects (known as its elements). The
> schema of the table
> * would be inferred from the type of elements.
> *
> * @param data a group of objects.
> */
> Table fromElements(Collection<?> data);
> /**
> * Creates a table from a group of objects (known as its elements). The
> schema of the table
> * would be inferred from the passed in data type.
> *
> * @param data a group of objects
> * @param dataType the data type of the data
> */
> Table fromElements(Collection<?> data, DataType dataType);
> {code}
> h1. Use Case
> * One potential use case for Table API
> {code:java}
> @Test
> def testUnregisteredCollectionSource1(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>
> tEnv.fromElements(data.asJava)
> .as('first, 'id, 'score, 'last)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> @Test
> def testUnregisteredCollectionSource2(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
> val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
> tEnv.fromElements(data.asJava, dataType)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> * One potential use case for SQL
> {code:java}
> @Test
> def testUnregisteredCollectionSource1(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>
> val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
>
> tEnv.createTemporaryView("T", table)
> tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> @Test
> def testUnregisteredCollectionSource2(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
> val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
> val table = tEnv.fromElements(data.asJava, dataType)
> tEnv.createTemporaryView("T", table)
> tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> h1. The proposal
> * data type inference
> We need to infer the data type from the data for the first interface. A
> potential tool is the DataTypeExtractor, but it doesn't support scala.tuple,
> Row, etc. For the most popular in our test cases Row or scala.tuple type, we
> could enumerate and use a recursive traversal method to get all available
> types of underlying objects. This can solve most of the cases and improve
> usability.
> * proposed changes
> ** A CollectionQueryOperation which implements QueryOperation to describe
> the relational operation
> ** The logical and physical RelNode for legacy planner. In the physical
> node, we can translate the data to DataStream
> ** The logical and physical RelNode for blink planner. In the physical node,
> we can translate the data to Transformation
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)