[
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224026#comment-16224026
]
ASF GitHub Bot commented on FLINK-7548:
---------------------------------------
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4894#discussion_r147580191
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
cluster,
traitSet,
getTable,
- tableSource
+ tableSource,
+ selectedFields
)
}
override def copy(
traitSet: RelTraitSet,
- newTableSource: TableSource[_])
- : PhysicalTableSourceScan = {
+ newTableSource: TableSource[_]): PhysicalTableSourceScan = {
new StreamTableSourceScan(
cluster,
traitSet,
getTable,
- newTableSource.asInstanceOf[StreamTableSource[_]]
+ newTableSource.asInstanceOf[StreamTableSource[_]],
+ selectedFields
)
}
override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig): DataStream[CRow] = {
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = true,
+ selectedFields)
+
val config = tableEnv.getConfig
val inputDataStream =
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToInternalRow(
- new RowSchema(getRowType),
+ val outputSchema = new RowSchema(this.getRowType)
+
+ // check that declared and actual type of table source DataStream are
identical
+ if (inputDataStream.getType != tableSource.getReturnType) {
+ throw new TableException(s"TableSource of type
${tableSource.getClass.getCanonicalName} " +
+ s"returned a DataStream of type ${inputDataStream.getType} that
does not match with the " +
+ s"type ${tableSource.getReturnType} declared by the
TableSource.getReturnType() method. " +
+ s"Please validate the implementation of the TableSource.")
+ }
+
+ // get expression to extract rowtime attribute
+ val rowtimeExpression: Option[RexNode] =
TableSourceUtil.getRowtimeExtractionExpression(
+ tableSource,
+ selectedFields,
+ cluster,
+ tableEnv.getRelBuilder,
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+ )
+
+ // ingest table and convert and extract time attributes if necessary
+ val ingestedTable = convertToInternalRow(
+ outputSchema,
inputDataStream,
- new StreamTableSourceTable(tableSource),
- config)
+ fieldIndexes,
+ config,
+ rowtimeExpression)
+
+ // generate watermarks for rowtime indicator
+ val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+ TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
selectedFields)
+
+ val withWatermarks = if (rowtimeDesc.isDefined) {
+ val rowtimeFieldIdx =
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+ val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+ watermarkStrategy match {
+ case p: PeriodicWatermarkAssigner =>
+ val watermarkGenerator = new
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ case p: PunctuatedWatermarkAssigner =>
+ val watermarkGenerator = new
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ }
+ } else {
+ // No need to generate watermarks if no rowtime attribute is
specified.
+ ingestedTable
+ }
+
+ withWatermarks
+ }
+}
+
+/**
+ * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+ *
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ */
+private class PeriodicWatermarkAssignerWrapper(
+ timeFieldIdx: Int,
+ assigner: PeriodicWatermarkAssigner)
+ extends AssignerWithPeriodicWatermarks[CRow] {
+
+ override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+ override def extractTimestamp(crow: CRow, previousElementTimestamp:
Long): Long = {
+ val timestamp: Long =
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+ assigner.nextTimestamp(timestamp)
+ 0L
--- End diff --
I know the timestamp in the `StreamRecord` is useless for the Table/SQL API
now, but the returned value will still be set and `StreamRecord.hasTimestamp()`
will be `true`. How about return a negative value (e.g., -1) here, so that
maybe it's possible for us to erase the time from `StreamRecord` when the
extracted rowtime is negative.
> Support watermark generation for TableSource
> --------------------------------------------
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Fabian Hueske
> Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define
> rowtime field, but not support to extract watermarks from the rowtime field.
> We can provide a new interface called {{DefinedWatermark}}, which has two
> methods {{getRowtimeAttribute}} (can only be an existing field) and
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in
> strategies needs further discussion.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)