wuchong commented on a change in pull request #10098:
[FLINK-14326][FLINK-14324][table] Support to apply watermark assigner in planner
URL: https://github.com/apache/flink/pull/10098#discussion_r343508658
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
##########
@@ -153,33 +155,91 @@ class StreamExecTableSourceScan(
val ingestedTable = new DataStream(planner.getExecEnv,
streamTransformation)
- // generate watermarks for rowtime indicator
- val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
- TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
tableSourceTable.selectedFields)
-
- val withWatermarks = if (rowtimeDesc.isDefined) {
- val rowtimeFieldIdx =
getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
- val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
- watermarkStrategy match {
- case p: PeriodicWatermarkAssigner =>
- val watermarkGenerator = new
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
- ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
- case p: PunctuatedWatermarkAssigner =>
- val watermarkGenerator =
- new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p,
producedDataType)
- ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
- case _: PreserveWatermarks =>
- // The watermarks have already been provided by the underlying
DataStream.
- ingestedTable
- }
+ val withWatermarks = if (tableSourceTable.watermarkSpec.isDefined) {
+ // generate watermarks for rowtime indicator from WatermarkSpec
+ val rowType = tableSourceTable.getRowType(planner.getTypeFactory)
+ val converter = planner.plannerContext.createSqlToRexConverter(rowType)
+ applyWatermarkByWatermarkSpec(
+ config,
+ ingestedTable,
Review comment:
Unfortunately, we can't. The `WatermarkStrategy` in
`RowtimeAttributeDescriptor` is an imperative API, it's hard to convert it into
a `RexNode` or UDF.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services