Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159772482
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -414,11 +414,14 @@ abstract class StreamTableEnvironment(
val streamType = dataStream.getType
+ // determine schema definition mode (by position or by name)
+ val isRefByPosition = isReferenceByPosition(streamType, fields)
+
// get field names and types for all non-replaced fields
- val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](isRefByPosition,
streamType, fields)
// validate and extract time attributes
- val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType,
fields)
+ val (rowtime, proctime) =
validateAndExtractTimeAttributes(isRefByPosition, streamType, fields)
--- End diff --
call `isReferenceByPosition()` inside of
`validateAndExtractTimeAttributes()`? It has all required parameters and the
call is only required if `streamType` is a tuple-like type.
---