Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159764095
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -791,92 +824,109 @@ abstract class TableEnvironment(val config:
TableConfig) {
* Returns field names and field positions for a given
[[TypeInformation]] and [[Array]] of
* [[Expression]]. It does not handle time attributes but considers
them in indices.
*
+ * @param isReferenceByPosition schema mode see
[[isReferenceByPosition()]]
* @param inputType The [[TypeInformation]] against which the
[[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and
corresponding field positions.
*/
- protected[flink] def getFieldInfo[A](
+ protected def getFieldInfo[A](
+ isReferenceByPosition: Boolean,
inputType: TypeInformation[A],
exprs: Array[Expression])
: (Array[String], Array[Int]) = {
TableEnvironment.validateType(inputType)
+ def referenceByName(name: String, ct: CompositeType[_]): Option[(Int,
String)] = {
+ val inputIdx = ct.getFieldIndex(name)
+ if (inputIdx < 0) {
+ throw new TableException(s"$name is not a field of type $ct. " +
+ s"Expected: ${ct.getFieldNames.mkString(", ")}")
+ } else {
+ Some((inputIdx, name))
+ }
+ }
+
val indexedNames: Array[(Int, String)] = inputType match {
+
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table.
" +
"Please specify the type of the input with a RowTypeInfo.")
- case a: AtomicType[_] =>
- exprs.zipWithIndex flatMap {
- case (_: TimeAttribute, _) =>
- None
- case (UnresolvedFieldReference(name), idx) if idx > 0 =>
- // only accept the first field for an atomic type
- throw new TableException("Only the first field can reference
an atomic type.")
- case (UnresolvedFieldReference(name), idx) =>
- // first field reference is mapped to atomic type
- Some((0, name))
- case _ => throw new TableException("Field reference expression
requested.")
- }
+
case t: TupleTypeInfo[A] =>
exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name), idx) =>
- Some((idx, name))
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+ case (UnresolvedFieldReference(name: String), idx) =>
+ if (isReferenceByPosition) {
+ Some((idx, name))
+ } else {
+ referenceByName(name, t)
+ }
+ case (Alias(UnresolvedFieldReference(origName), name: String,
_), _) =>
val idx = t.getFieldIndex(origName)
--- End diff --
we can use `referenceByName()` here.
---