KurtYoung commented on a change in pull request #10098:
[FLINK-14326][FLINK-14324][table] Support to apply watermark assigner in planner
URL: https://github.com/apache/flink/pull/10098#discussion_r343461493
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -50,13 +53,29 @@ class TableSourceTable[T](
this(tableSource, isStreamingMode, statistic, None, catalogTable)
}
+ Preconditions.checkNotNull(tableSource)
+ Preconditions.checkNotNull(statistic)
+ Preconditions.checkNotNull(catalogTable)
+
+ val watermarkSpec: Option[WatermarkSpec] = catalogTable
+ .getSchema
+ .getWatermarkSpecs.asScala.headOption
+
+ if (TableSourceValidation.hasRowtimeAttribute(tableSource) &&
watermarkSpec.isDefined) {
+ throw new TableException(
+ "Watermark syntax in DDL can't work together with
DefinedRowtimeAttributes interface. " +
+ "If watermark is specified in DDL, the underlying TableSource of
connector shouldn't " +
+ "implement DefinedRowtimeAttributes.")
+ }
+
// TODO implements this
// TableSourceUtil.validateTableSource(tableSource)
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
TableSourceUtil.getRelDataType(
Review comment:
This is wrong, `TableSourceTable`'s return type should not be decided by
`TableSource`Util
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services