Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4710#discussion_r140774888
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -255,11 +255,30 @@ abstract class CodeGenerator(
             generateRowtimeAccess()
           case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
             // attribute is proctime indicator.
    -        // We use a null literal and generate a timestamp when we need it.
    +        // we use a null literal and generate a timestamp when we need it.
             generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
           case idx =>
    -        // regular attribute. Access attribute in input data type.
    -        generateInputAccess(input1, input1Term, idx)
    +        // get type of result field
    +        val outIdx = input1Mapping.indexOf(idx)
    +        val outType = returnType match {
    +          case pt: PojoTypeInfo[_] => 
pt.getTypeAt(resultFieldNames(outIdx))
    +          case ct: CompositeType[_] => ct.getTypeAt(outIdx)
    +          case t: TypeInformation[_] => t
    +        }
    +        val inputAccess = generateInputAccess(input1, input1Term, idx)
    +        // Change output type to rowtime indicator
    +        if (FlinkTypeFactory.isRowtimeIndicatorType(outType) &&
    +          (inputAccess.resultType == Types.LONG || inputAccess.resultType 
== Types.SQL_TIMESTAMP)) {
    +          // Hard cast possible because LONG, TIMESTAMP, and 
ROW_TIMEINDICATOR are internally
    --- End diff --
    
    `ROWTIME_INDICATOR`


---

Reply via email to