Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/5662#discussion_r173784883
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
---
@@ -148,6 +148,13 @@ object SchemaValidator {
val schema = properties.getTableSchema(SCHEMA)
+ // add all source fields first because rowtime might reference one of
them
+ toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --
Hi @twalthr, can we check the used `TimestampExtractor` here? Specifically,
if it's an `ExistingField`, we only included the target fields; if it's a
`StreamRecordTimestamp` we don't include extra fields; and only if it's a
custom extractor we include all the source fields.
---