Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159776487
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -464,23 +470,29 @@ abstract class StreamTableEnvironment(
throw new TableException(
"The rowtime attribute can only be defined once in a table
schema.")
} else {
- val mappedIdx = streamType match {
- case pti: PojoTypeInfo[_] =>
- pti.getFieldIndex(origName.getOrElse(name))
- case _ => idx;
- }
- // check type of field that is replaced
- if (mappedIdx < 0) {
- throw new TableException(
- s"The rowtime attribute can only replace a valid field. " +
- s"${origName.getOrElse(name)} is not a field of type
$streamType.")
- }
- else if (mappedIdx < fieldTypes.length &&
- !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
- TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
- throw new TableException(
- s"The rowtime attribute can only replace a field with a valid
time type, " +
- s"such as Timestamp or Long. But was:
${fieldTypes(mappedIdx)}")
+ // if the fields are referenced by position,
+ // it is possible to replace an existing field or append the time
attribute at the end
+ if (isReferenceByPosition) {
--- End diff --
IMO, we should distinguish the types (tuple-like, pojo, any) first before
checking for the reference mode (which is only relevant for tuple-like types).
---