Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159776065
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -464,23 +470,29 @@ abstract class StreamTableEnvironment(
             throw new TableException(
               "The rowtime attribute can only be defined once in a table 
schema.")
           } else {
    -        val mappedIdx = streamType match {
    -          case pti: PojoTypeInfo[_] =>
    -            pti.getFieldIndex(origName.getOrElse(name))
    -          case _ => idx;
    -        }
    -        // check type of field that is replaced
    -        if (mappedIdx < 0) {
    -          throw new TableException(
    -            s"The rowtime attribute can only replace a valid field. " +
    -              s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
    -        }
    -        else if (mappedIdx < fieldTypes.length &&
    -          !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
    -            TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
    -          throw new TableException(
    -            s"The rowtime attribute can only replace a field with a valid 
time type, " +
    -              s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
    +        // if the fields are referenced by position,
    +        // it is possible to replace an existing field or append the time 
attribute at the end
    +        if (isReferenceByPosition) {
    +
    +          val mappedIdx = streamType match {
    +            case pti: PojoTypeInfo[_] =>
    --- End diff --
    
    I think this case will never be entered, because `isReferenceByPosition` 
will always be `false` for `PojoTypeInfo`.
      


---

Reply via email to