Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4894#discussion_r147661037
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
cluster,
traitSet,
getTable,
- tableSource
+ tableSource,
+ selectedFields
)
}
override def copy(
traitSet: RelTraitSet,
- newTableSource: TableSource[_])
- : PhysicalTableSourceScan = {
+ newTableSource: TableSource[_]): PhysicalTableSourceScan = {
new StreamTableSourceScan(
cluster,
traitSet,
getTable,
- newTableSource.asInstanceOf[StreamTableSource[_]]
+ newTableSource.asInstanceOf[StreamTableSource[_]],
+ selectedFields
)
}
override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig): DataStream[CRow] = {
+ val fieldIndexes = TableSourceUtil.computeIndexMapping(
+ tableSource,
+ isStreamTable = true,
+ selectedFields)
+
val config = tableEnv.getConfig
val inputDataStream =
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToInternalRow(
- new RowSchema(getRowType),
+ val outputSchema = new RowSchema(this.getRowType)
+
+ // check that declared and actual type of table source DataStream are
identical
+ if (inputDataStream.getType != tableSource.getReturnType) {
+ throw new TableException(s"TableSource of type
${tableSource.getClass.getCanonicalName} " +
+ s"returned a DataStream of type ${inputDataStream.getType} that
does not match with the " +
+ s"type ${tableSource.getReturnType} declared by the
TableSource.getReturnType() method. " +
+ s"Please validate the implementation of the TableSource.")
+ }
+
+ // get expression to extract rowtime attribute
+ val rowtimeExpression: Option[RexNode] =
TableSourceUtil.getRowtimeExtractionExpression(
+ tableSource,
+ selectedFields,
+ cluster,
+ tableEnv.getRelBuilder,
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+ )
+
+ // ingest table and convert and extract time attributes if necessary
+ val ingestedTable = convertToInternalRow(
+ outputSchema,
inputDataStream,
- new StreamTableSourceTable(tableSource),
- config)
+ fieldIndexes,
+ config,
+ rowtimeExpression)
+
+ // generate watermarks for rowtime indicator
+ val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+ TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
selectedFields)
+
+ val withWatermarks = if (rowtimeDesc.isDefined) {
+ val rowtimeFieldIdx =
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+ val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+ watermarkStrategy match {
+ case p: PeriodicWatermarkAssigner =>
+ val watermarkGenerator = new
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ case p: PunctuatedWatermarkAssigner =>
+ val watermarkGenerator = new
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+ }
+ } else {
+ // No need to generate watermarks if no rowtime attribute is
specified.
+ ingestedTable
+ }
+
+ withWatermarks
+ }
+}
+
+/**
+ * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+ *
+ * @param timeFieldIdx the index of the rowtime attribute.
+ * @param assigner the watermark assigner.
+ */
+private class PeriodicWatermarkAssignerWrapper(
+ timeFieldIdx: Int,
+ assigner: PeriodicWatermarkAssigner)
+ extends AssignerWithPeriodicWatermarks[CRow] {
+
+ override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+ override def extractTimestamp(crow: CRow, previousElementTimestamp:
Long): Long = {
+ val timestamp: Long =
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+ assigner.nextTimestamp(timestamp)
+ 0L
--- End diff --
I think this does not really matter. We can always erase the timestamp
regardless of the value. For some operations that are build on DataStream API
primitives (such as the group windows) we have to set it to the correct value
and can discard it later.
When a Table is converted into a DataStream, the StreamRecord timestamp is
set to the rowtime attribute.
---