KurtYoung commented on a change in pull request #10098:
[FLINK-14326][FLINK-14324][table] Support to apply watermark assigner in planner
URL: https://github.com/apache/flink/pull/10098#discussion_r343463516
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
##########
@@ -153,33 +155,91 @@ class StreamExecTableSourceScan(
val ingestedTable = new DataStream(planner.getExecEnv,
streamTransformation)
- // generate watermarks for rowtime indicator
- val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
- TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
tableSourceTable.selectedFields)
-
- val withWatermarks = if (rowtimeDesc.isDefined) {
- val rowtimeFieldIdx =
getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
- val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
- watermarkStrategy match {
- case p: PeriodicWatermarkAssigner =>
- val watermarkGenerator = new
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
- ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
- case p: PunctuatedWatermarkAssigner =>
- val watermarkGenerator =
- new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p,
producedDataType)
- ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
- case _: PreserveWatermarks =>
- // The watermarks have already been provided by the underlying
DataStream.
- ingestedTable
- }
+ val withWatermarks = if (tableSourceTable.watermarkSpec.isDefined) {
+ // generate watermarks for rowtime indicator from WatermarkSpec
+ val rowType = tableSourceTable.getRowType(planner.getTypeFactory)
+ val converter = planner.plannerContext.createSqlToRexConverter(rowType)
+ applyWatermarkByWatermarkSpec(
+ config,
+ ingestedTable,
+ FlinkTypeFactory.toLogicalRowType(rowType),
+ tableSourceTable.watermarkSpec.get,
+ converter)
} else {
- // No need to generate watermarks if no rowtime attribute is specified.
- ingestedTable
+ val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+ TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
tableSourceTable.selectedFields)
+
+ // generate watermarks for rowtime indicator from
DefinedRowtimeAttributes
+ if (rowtimeDesc.isDefined) {
+ applyWatermarkByRowtimeAttributeDescriptor(
+ ingestedTable,
+ rowtimeDesc.get,
+ producedDataType)
+ } else {
+ // No need to generate watermarks if no rowtime attribute is specified.
+ ingestedTable
+ }
}
+
withWatermarks.getTransformation
}
- def needInternalConversion: Boolean = {
+ private def applyWatermarkByWatermarkSpec(
+ config: TableConfig,
+ input: DataStream[BaseRow],
+ inputType: RowType,
+ watermarkSpec: WatermarkSpec,
+ converter: SqlToRexConverter): DataStream[BaseRow] = {
+ // TODO: [FLINK-14473] support nested field as the rowtime attribute in
the future
+ val rowtime = watermarkSpec.getRowtimeAttribute
+ if (rowtime.contains(".")) {
+ throw new TableException(
+ s"Nested field '$rowtime' as rowtime attribute is not supported right
now.")
+ }
+ val rowtimeFieldIndex = getRowType.getFieldNames.indexOf(rowtime)
+ val watermarkExpr =
converter.convertToRexNode(watermarkSpec.getWatermarkExpressionString)
+ val watermarkResultType = LogicalTypeDataTypeConverter
Review comment:
LogicalTypeDataTypeConverter is deprecated
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services