Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159771384
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -885,8 +935,23 @@ abstract class TableEnvironment(val config:
TableConfig) {
"Field reference expression or alias on field expression
expected.")
}
- case tpe => throw new TableException(
- s"Source of type $tpe cannot be converted into Table.")
+ case _: TypeInformation[_] => // atomic or other custom type
information
+ var referenced = false
+ exprs flatMap {
+ case _: TimeAttribute =>
+ None
+ case UnresolvedFieldReference(_) if referenced =>
+ // only accept the first field for an atomic type
+ throw new TableException("Only the first field can reference
an atomic type.")
+ case UnresolvedFieldReference(name: String) =>
+ referenced = true
+ // first field reference is mapped to atomic type
+ Some((0, name))
+ case Alias(UnresolvedFieldReference(_), name: String, _) =>
--- End diff --
should we support this case? This would allow to use an arbitrary name as a
references, e.g., for a String type (which has no name), we could define the
schema as `('a as 'b)`. The name of the field would be `'b` but where would
`'a` point to?
---