[ 
https://issues.apache.org/jira/browse/FLINK-6483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004582#comment-16004582
 ] 

ASF GitHub Bot commented on FLINK-6483:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3862#discussion_r115723393
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ---
    @@ -114,101 +78,207 @@ class RelTimeIndicatorConverter(rexBuilder: 
RexBuilder) extends RelShuttleImpl {
             "Union fields with time attributes have different types.")
         }
     
    -    updatedUnion
    +    LogicalUnion.create(inputs, union.all)
       }
     
    +  override def visit(aggregate: LogicalAggregate): RelNode = 
convertAggregate(aggregate)
    +
    +  override def visit(minus: LogicalMinus): RelNode =
    +    throw new TableException("Logical minus in a stream environment is not 
supported yet.")
    +
    +  override def visit(sort: LogicalSort): RelNode =
    +    throw new TableException("Logical sort in a stream environment is not 
supported yet.")
    +
    +  override def visit(`match`: LogicalMatch): RelNode =
    +    throw new TableException("Logical match in a stream environment is not 
supported yet.")
    +
       override def visit(other: RelNode): RelNode = other match {
    -    case scan: LogicalTableFunctionScan if
    -        stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
    +
    +    case uncollect: Uncollect =>
           // visit children and update inputs
    -      val updatedScan = 
super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
    -
    -      val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
    -
    -      // check if input field contains time indicator type
    -      // materialize field if no time indicator is present anymore
    -      // if input field is already materialized, change to timestamp type
    -      val materializer = new RexTimeIndicatorMaterializer(
    -        rexBuilder,
    -        correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
    -      val newCall = updatedScan.getCall.accept(materializer)
    -
    -      // copy scan
    -      updatedScan.copy(
    -        updatedScan.getTraitSet,
    -        updatedScan.getInputs,
    -        newCall,
    -        updatedScan.getElementType,
    -        updatedScan.getRowType,
    -        updatedScan.getColumnMappings
    -      )
    +      val input = uncollect.getInput.accept(this)
    +      Uncollect.create(uncollect.getTraitSet, input, 
uncollect.withOrdinality)
    +
    +    case scan: LogicalTableFunctionScan =>
    +      scan
    +
    +    case aggregate: LogicalWindowAggregate =>
    +      val convAggregate = convertAggregate(aggregate)
    +
    +      LogicalWindowAggregate.create(
    +        aggregate.getWindow,
    +        aggregate.getNamedProperties,
    +        convAggregate)
     
         case _ =>
    -      super.visit(other)
    +      throw new TableException(s"Unsupported logical operator: 
${other.getClass.getSimpleName}")
    +  }
    +
    +
    +  override def visit(exchange: LogicalExchange): RelNode =
    +    throw new TableException("Logical exchange in a stream environment is 
not supported yet.")
    +
    +  override def visit(scan: TableScan): RelNode = scan
    +
    +  override def visit(scan: TableFunctionScan): RelNode =
    +    throw new TableException("Table function scan in a stream environment 
is not supported yet.")
    +
    +  override def visit(values: LogicalValues): RelNode = values
    +
    +  override def visit(filter: LogicalFilter): RelNode = {
    +    // visit children and update inputs
    +    val input = filter.getInput.accept(this)
    +
    +    // check if input field contains time indicator type
    +    // materialize field if no time indicator is present anymore
    +    // if input field is already materialized, change to timestamp type
    +    val materializer = new RexTimeIndicatorMaterializer(
    +      rexBuilder,
    +      input.getRowType.getFieldList.map(_.getType))
    +
    +    val condition = filter.getCondition.accept(materializer)
    +    LogicalFilter.create(input, condition)
    +  }
    +
    +  override def visit(project: LogicalProject): RelNode = {
    +    // visit children and update inputs
    +    val input = project.getInput.accept(this)
    +
    +    // check if input field contains time indicator type
    +    // materialize field if no time indicator is present anymore
    +    // if input field is already materialized, change to timestamp type
    +    val materializer = new RexTimeIndicatorMaterializer(
    +      rexBuilder,
    +      input.getRowType.getFieldList.map(_.getType))
    +
    +    val projects = project.getProjects.map(_.accept(materializer))
    +    val fieldNames = project.getRowType.getFieldNames
    +    LogicalProject.create(input, projects, fieldNames)
       }
     
    -  private def buildRowType(names: Seq[String], types: Seq[RelDataType]): 
RelDataType = {
    -    val fields = names.zipWithIndex.map { case (name, idx) =>
    -      new RelDataTypeFieldImpl(name, idx, types(idx))
    +  override def visit(join: LogicalJoin): RelNode =
    +    throw new TableException("Logical join in a stream environment is not 
supported yet.")
    +
    +  override def visit(correlate: LogicalCorrelate): RelNode = {
    +    // visit children and update inputs
    +    val inputs = correlate.getInputs.map(_.accept(this))
    +
    +    val right = inputs(1) match {
    +      case scan: LogicalTableFunctionScan =>
    +        // visit children and update inputs
    +        val scanInputs = scan.getInputs.map(_.accept(this))
    +
    +        // check if input field contains time indicator type
    +        // materialize field if no time indicator is present anymore
    +        // if input field is already materialized, change to timestamp type
    +        val materializer = new RexTimeIndicatorMaterializer(
    +          rexBuilder,
    +          inputs.head.getRowType.getFieldList.map(_.getType))
    +
    +        val call = scan.getCall.accept(materializer)
    +        LogicalTableFunctionScan.create(
    +          scan.getCluster,
    +          scanInputs,
    +          call,
    +          scan.getElementType,
    +          scan.getRowType,
    +          scan.getColumnMappings)
    +
    +      case _ =>
    +        inputs(1)
         }
    -    new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
    +
    +    LogicalCorrelate.create(
    +      inputs.head,
    +      right,
    +      correlate.getCorrelationId,
    +      correlate.getRequiredColumns,
    +      correlate.getJoinType)
    +  }
    +
    +  private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
    --- End diff --
    
    Return a `(Option[LogicalProject], LogicalAggregate)` to cover the case 
when a `LogicalProject` needs to be prepended to materialize a grouping key or 
agg function argument (see comments below).


> Support time materialization
> ----------------------------
>
>                 Key: FLINK-6483
>                 URL: https://issues.apache.org/jira/browse/FLINK-6483
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> FLINK-5884 added support for time indicators. However, there are still some 
> features missing i.e. materialization of metadata timestamp.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to