Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159772482
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -414,11 +414,14 @@ abstract class StreamTableEnvironment(
     
         val streamType = dataStream.getType
     
    +    // determine schema definition mode (by position or by name)
    +    val isRefByPosition = isReferenceByPosition(streamType, fields)
    +
         // get field names and types for all non-replaced fields
    -    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
    +    val (fieldNames, fieldIndexes) = getFieldInfo[T](isRefByPosition, 
streamType, fields)
     
         // validate and extract time attributes
    -    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, 
fields)
    +    val (rowtime, proctime) = 
validateAndExtractTimeAttributes(isRefByPosition, streamType, fields)
    --- End diff --
    
    call `isReferenceByPosition()` inside of 
`validateAndExtractTimeAttributes()`? It has all required parameters and the 
call is only required if `streamType` is a tuple-like type.


---

Reply via email to