[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224613#comment-16224613 ]
ASF GitHub Bot commented on FLINK-7548: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147661037 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) - : PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { + val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToInternalRow( - new RowSchema(getRowType), + val outputSchema = new RowSchema(this.getRowType) + + // check that declared and actual type of table source DataStream are identical + if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + + s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + + s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + + s"Please validate the implementation of the TableSource.") + } + + // get expression to extract rowtime attribute + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + ) + + // ingest table and convert and extract time attributes if necessary + val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + + // generate watermarks for rowtime indicator + val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + + val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { + case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } + } else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable + } + + withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( + timeFieldIdx: Int, + assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { + val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] + assigner.nextTimestamp(timestamp) + 0L --- End diff -- I think this does not really matter. We can always erase the timestamp regardless of the value. For some operations that are build on DataStream API primitives (such as the group windows) we have to set it to the correct value and can discard it later. When a Table is converted into a DataStream, the StreamRecord timestamp is set to the rowtime attribute. > Support watermark generation for TableSource > -------------------------------------------- > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Fabian Hueske > Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)