[ 
https://issues.apache.org/jira/browse/FLINK-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000418#comment-16000418
 ] 

Fabian Hueske commented on FLINK-6476:
--------------------------------------

Hi [~rtudoran],

you are right, it should be possible convert a DataStream<Row> (or 
DataSet<Row>) into a Table.
As you noticed, the TableEnvironment.getFieldInfo() lacks support for 
RowTypeInfo.

Another issue is that it is not possible to reliably extract the 
TypeInformation for Row (neither from signatures nor object instances). 
By default, Flink will use a GenericType<Row> for a DataSet<Row> or 
DataStream<Row>. Since, GenericType<Row> does not contain any information about 
the fields of a Row is cannot be used to create a Table. (This issue was 
address in FLINK-6059)
Therefore, users must manually specify a RowTypeInfo to create a Table.

The Java API offers the DataStream.returns() method to hint types. In Scala, we 
use implicit types to override the automatically extracted type:

{code}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = List(Row.of("Hello", "Worlds", Int.box(1)))
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, 
Types.STRING, Types.INT) // tpe is automatically chosen when an implicit value 
of type TypeInformation[Row] is requested.
val stream = env.fromCollection(data)
{code}




> Table environment register row data stream
> ------------------------------------------
>
>                 Key: FLINK-6476
>                 URL: https://issues.apache.org/jira/browse/FLINK-6476
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>         Environment: java/scala
>            Reporter: radu
>            Assignee: radu
>              Labels: feature, patch
>
> Registering as table source streams with Row is currently not possible:
> Java:
> DataStream<Row> ds = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");
> org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, 
> f1: Long, f2: Integer, f3: String, f4: Integer) cannot be converted into 
> Table.
>       at 
> org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363)
>       at 
> org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133)
>       at 
> org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92)
> Scala:
> val ds:DataStream[Row] = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e");
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.common.typeutils.CompositeType
> This can be supported by extending the in the  
> org.apache.flink.table.api.TableEnvironment
> getFieldInfo()
> and by constructing the StreamTableSource correspondingly



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to