[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224613#comment-16224613
 ] 

ASF GitHub Bot commented on FLINK-7548:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4894#discussion_r147661037
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
    @@ -56,33 +63,123 @@ class StreamTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      tableSource
    +      tableSource,
    +      selectedFields
         )
       }
     
       override def copy(
           traitSet: RelTraitSet,
    -      newTableSource: TableSource[_])
    -    : PhysicalTableSourceScan = {
    +      newTableSource: TableSource[_]): PhysicalTableSourceScan = {
     
         new StreamTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      newTableSource.asInstanceOf[StreamTableSource[_]]
    +      newTableSource.asInstanceOf[StreamTableSource[_]],
    +      selectedFields
         )
       }
     
       override def translateToPlan(
           tableEnv: StreamTableEnvironment,
           queryConfig: StreamQueryConfig): DataStream[CRow] = {
     
    +    val fieldIndexes = TableSourceUtil.computeIndexMapping(
    +      tableSource,
    +      isStreamTable = true,
    +      selectedFields)
    +
         val config = tableEnv.getConfig
         val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
    -    convertToInternalRow(
    -      new RowSchema(getRowType),
    +    val outputSchema = new RowSchema(this.getRowType)
    +
    +    // check that declared and actual type of table source DataStream are 
identical
    +    if (inputDataStream.getType != tableSource.getReturnType) {
    +      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
    +        s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
    +        s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
    +        s"Please validate the implementation of the TableSource.")
    +    }
    +
    +    // get expression to extract rowtime attribute
    +    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
    +      tableSource,
    +      selectedFields,
    +      cluster,
    +      tableEnv.getRelBuilder,
    +      TimeIndicatorTypeInfo.ROWTIME_INDICATOR
    +    )
    +
    +    // ingest table and convert and extract time attributes if necessary
    +    val ingestedTable = convertToInternalRow(
    +      outputSchema,
           inputDataStream,
    -      new StreamTableSourceTable(tableSource),
    -      config)
    +      fieldIndexes,
    +      config,
    +      rowtimeExpression)
    +
    +    // generate watermarks for rowtime indicator
    +    val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
    +      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
    +
    +    val withWatermarks = if (rowtimeDesc.isDefined) {
    +      val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
    +      val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
    +      watermarkStrategy match {
    +        case p: PeriodicWatermarkAssigner =>
    +          val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
    +          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
    +        case p: PunctuatedWatermarkAssigner =>
    +          val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
    +          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
    +      }
    +    } else {
    +      // No need to generate watermarks if no rowtime attribute is 
specified.
    +      ingestedTable
    +    }
    +
    +    withWatermarks
    +  }
    +}
    +
    +/**
    +  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
    +  *
    +  * @param timeFieldIdx the index of the rowtime attribute.
    +  * @param assigner the watermark assigner.
    +  */
    +private class PeriodicWatermarkAssignerWrapper(
    +    timeFieldIdx: Int,
    +    assigner: PeriodicWatermarkAssigner)
    +  extends AssignerWithPeriodicWatermarks[CRow] {
    +
    +  override def getCurrentWatermark: Watermark = assigner.getWatermark
    +
    +  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
    +    val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
    +    assigner.nextTimestamp(timestamp)
    +    0L
    --- End diff --
    
    I think this does not really matter. We can always erase the timestamp 
regardless of the value. For some operations that are build on DataStream API 
primitives (such as the group windows) we have to set it to the correct value 
and can discard it later. 
    When a Table is converted into a DataStream, the StreamRecord timestamp is 
set to the rowtime attribute.


> Support watermark generation for TableSource
> --------------------------------------------
>
>                 Key: FLINK-7548
>                 URL: https://issues.apache.org/jira/browse/FLINK-7548
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Fabian Hueske
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to