[
https://issues.apache.org/jira/browse/FLINK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052004#comment-17052004
]
Aljoscha Krettek commented on FLINK-16327:
------------------------------------------
I believe [~dwysakowicz] is already working on this as part of a FLIP.
Unfortunately, I don't know the FLIP number for this, maybe [~twalthr] could
help me with this? Unfortunately both Timo and Dawid are on vacation right now
but Timo should be back next week.
I'm closing this for now since there is already implementation work ongoing and
to keep our Jira free of duplicates. Please re-open if it turns out there is
not already a Jira/FLIP.
> Add TableEnvironment.fromElements interfaces for usability
> ----------------------------------------------------------
>
> Key: FLINK-16327
> URL: https://issues.apache.org/jira/browse/FLINK-16327
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.11.0
> Reporter: Zhenghua Gao
> Priority: Major
>
> h1. Interface
> {code:java}
> /**
> * Creates a table from a group of objects (known as its elements). The
> schema of the table
> * would be inferred from the type of elements.
> *
> * @param data a group of objects.
> */
> Table fromElements(Collection<?> data);
> /**
> * Creates a table from a group of objects (known as its elements). The
> schema of the table
> * would be inferred from the passed in data type.
> *
> * @param data a group of objects
> * @param dataType the data type of the data
> */
> Table fromElements(Collection<?> data, DataType dataType);
> {code}
> h1. Use Case
> * One potential use case for Table API
> {code:java}
> @Test
> def testUnregisteredCollectionSource1(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>
> tEnv.fromElements(data.asJava)
> .as('first, 'id, 'score, 'last)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> @Test
> def testUnregisteredCollectionSource2(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
> val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
> tEnv.fromElements(data.asJava, dataType)
> .where('id > 4)
> .select('last, 'score * 2)
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> * One potential use case for SQL
> {code:java}
> @Test
> def testUnregisteredCollectionSource1(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
>
> val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last)
>
> tEnv.createTemporaryView("T", table)
> tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> @Test
> def testUnregisteredCollectionSource2(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> StreamITCase.testResults = mutable.MutableList()
> val data = Seq(
> Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith"))
> val dataType = DataTypes.ROW(
> DataTypes.FIELD("first", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("score", DataTypes.DOUBLE()),
> DataTypes.FIELD("last", DataTypes.STRING()))
> val table = tEnv.fromElements(data.asJava, dataType)
> tEnv.createTemporaryView("T", table)
> tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4")
> .toAppendStream[Row]
> .addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> h1. The proposal
> * data type inference
> We need to infer the data type from the data for the first interface. A
> potential tool is the DataTypeExtractor, but it doesn't support scala.tuple,
> Row, etc. For the most popular in our test cases Row or scala.tuple type, we
> could enumerate and use a recursive traversal method to get all available
> types of underlying objects. This can solve most of the cases and improve
> usability.
> * proposed changes
> ** A CollectionQueryOperation which implements QueryOperation to describe
> the relational operation
> ** The logical and physical RelNode for legacy planner. In the physical
> node, we can translate the data to DataStream
> ** The logical and physical RelNode for blink planner. In the physical node,
> we can translate the data to Transformation
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)