[ https://issues.apache.org/jira/browse/FLINK-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000418#comment-16000418 ]
Fabian Hueske commented on FLINK-6476: -------------------------------------- Hi [~rtudoran], you are right, it should be possible convert a DataStream<Row> (or DataSet<Row>) into a Table. As you noticed, the TableEnvironment.getFieldInfo() lacks support for RowTypeInfo. Another issue is that it is not possible to reliably extract the TypeInformation for Row (neither from signatures nor object instances). By default, Flink will use a GenericType<Row> for a DataSet<Row> or DataStream<Row>. Since, GenericType<Row> does not contain any information about the fields of a Row is cannot be used to create a Table. (This issue was address in FLINK-6059) Therefore, users must manually specify a RowTypeInfo to create a Table. The Java API offers the DataStream.returns() method to hint types. In Scala, we use implicit types to override the automatically extracted type: {code} val env = StreamExecutionEnvironment.getExecutionEnvironment val data = List(Row.of("Hello", "Worlds", Int.box(1))) implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.STRING, Types.INT) // tpe is automatically chosen when an implicit value of type TypeInformation[Row] is requested. val stream = env.fromCollection(data) {code} > Table environment register row data stream > ------------------------------------------ > > Key: FLINK-6476 > URL: https://issues.apache.org/jira/browse/FLINK-6476 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Environment: java/scala > Reporter: radu > Assignee: radu > Labels: feature, patch > > Registering as table source streams with Row is currently not possible: > Java: > DataStream<Row> ds = ... > tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ..."); > org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, > f1: Long, f2: Integer, f3: String, f4: Integer) cannot be converted into > Table. > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363) > at > org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133) > at > org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92) > Scala: > val ds:DataStream[Row] = ... > tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e"); > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.common.typeutils.CompositeType > This can be supported by extending the in the > org.apache.flink.table.api.TableEnvironment > getFieldInfo() > and by constructing the StreamTableSource correspondingly -- This message was sent by Atlassian JIRA (v6.3.15#6346)