Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159877092
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -855,42 +852,26 @@ abstract class TableEnvironment(val config:
TableConfig) {
"An input of GenericTypeInfo<Row> cannot be converted to Table.
" +
"Please specify the type of the input with a RowTypeInfo.")
- case t: TupleTypeInfo[A] =>
- exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name: String), idx) =>
- if (isReferenceByPosition) {
- Some((idx, name))
- } else {
- referenceByName(name, t)
- }
- case (Alias(UnresolvedFieldReference(origName), name: String,
_), _) =>
- val idx = t.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type
$t. " +
- s"Expected: ${t.getFieldNames.mkString(", ")}")
- }
- Some((idx, name))
- case (_: TimeAttribute, _) =>
- None
- case _ => throw new TableException(
- "Field reference expression or alias on field expression
expected.")
- }
+ case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
+ t.isInstanceOf[CaseClassTypeInfo[A]] ||
t.isInstanceOf[RowTypeInfo] =>
+
+ // determine schema definition mode (by position or by name)
+ val isRefByPos = isReferenceByPosition(t, exprs)
- case c: CaseClassTypeInfo[A] =>
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name: String), idx) =>
- if (isReferenceByPosition) {
+ if (isRefByPos) {
--- End diff --
moving the `isRefByPos` check outside of the `match` might be easier to
read?
---