Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/5662#discussion_r173857218
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
---
@@ -148,6 +148,13 @@ object SchemaValidator {
val schema = properties.getTableSchema(SCHEMA)
+ // add all source fields first because rowtime might reference one of
them
+ toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --
I think we should first remove the added source fields before adding the
explicit mappings with the following snippet.
```
// add explicit mapping
case Some(source) =>
// should add mapping.remove(source)
mapping.put(name, source)
```
---