[ https://issues.apache.org/jira/browse/FLINK-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16001452#comment-16001452 ]
ASF GitHub Bot commented on FLINK-6476: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3849#discussion_r115339202 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case r: RowTypeInfo => { + // r.getFieldNames().map(name => + // (r.getFieldIndex(name),name)) + exprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + Some((idx, name)) + case (Alias(UnresolvedFieldReference(origName), name, _), _) => + val idx = r.getFieldIndex(origName) + if (idx < 0) { + throw new TableException(s"$origName is not a field of type $r") + } + Some((idx, name)) + case _ => throw new TableException( --- End diff -- we need to add the `TimeAttribute` case here as well: ``` case _: TimeAttribute => None ``` > Table environment register row data stream > ------------------------------------------ > > Key: FLINK-6476 > URL: https://issues.apache.org/jira/browse/FLINK-6476 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Environment: java/scala > Reporter: radu > Assignee: radu > Labels: feature, patch > > Registering as table source streams with Row is currently not possible: > Java: > DataStream<Row> ds = ... > tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ..."); > org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, > f1: Long, f2: Integer, f3: String, f4: Integer) cannot be converted into > Table. > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680) > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363) > at > org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133) > at > org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92) > Scala: > val ds:DataStream[Row] = ... > tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e"); > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.common.typeutils.CompositeType > This can be supported by extending the in the > org.apache.flink.table.api.TableEnvironment > getFieldInfo() > and by constructing the StreamTableSource correspondingly -- This message was sent by Atlassian JIRA (v6.3.15#6346)