Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3862#discussion_r115723393
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ---
    @@ -114,101 +78,207 @@ class RelTimeIndicatorConverter(rexBuilder: 
RexBuilder) extends RelShuttleImpl {
             "Union fields with time attributes have different types.")
         }
     
    -    updatedUnion
    +    LogicalUnion.create(inputs, union.all)
       }
     
    +  override def visit(aggregate: LogicalAggregate): RelNode = 
convertAggregate(aggregate)
    +
    +  override def visit(minus: LogicalMinus): RelNode =
    +    throw new TableException("Logical minus in a stream environment is not 
supported yet.")
    +
    +  override def visit(sort: LogicalSort): RelNode =
    +    throw new TableException("Logical sort in a stream environment is not 
supported yet.")
    +
    +  override def visit(`match`: LogicalMatch): RelNode =
    +    throw new TableException("Logical match in a stream environment is not 
supported yet.")
    +
       override def visit(other: RelNode): RelNode = other match {
    -    case scan: LogicalTableFunctionScan if
    -        stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
    +
    +    case uncollect: Uncollect =>
           // visit children and update inputs
    -      val updatedScan = 
super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
    -
    -      val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
    -
    -      // check if input field contains time indicator type
    -      // materialize field if no time indicator is present anymore
    -      // if input field is already materialized, change to timestamp type
    -      val materializer = new RexTimeIndicatorMaterializer(
    -        rexBuilder,
    -        correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
    -      val newCall = updatedScan.getCall.accept(materializer)
    -
    -      // copy scan
    -      updatedScan.copy(
    -        updatedScan.getTraitSet,
    -        updatedScan.getInputs,
    -        newCall,
    -        updatedScan.getElementType,
    -        updatedScan.getRowType,
    -        updatedScan.getColumnMappings
    -      )
    +      val input = uncollect.getInput.accept(this)
    +      Uncollect.create(uncollect.getTraitSet, input, 
uncollect.withOrdinality)
    +
    +    case scan: LogicalTableFunctionScan =>
    +      scan
    +
    +    case aggregate: LogicalWindowAggregate =>
    +      val convAggregate = convertAggregate(aggregate)
    +
    +      LogicalWindowAggregate.create(
    +        aggregate.getWindow,
    +        aggregate.getNamedProperties,
    +        convAggregate)
     
         case _ =>
    -      super.visit(other)
    +      throw new TableException(s"Unsupported logical operator: 
${other.getClass.getSimpleName}")
    +  }
    +
    +
    +  override def visit(exchange: LogicalExchange): RelNode =
    +    throw new TableException("Logical exchange in a stream environment is 
not supported yet.")
    +
    +  override def visit(scan: TableScan): RelNode = scan
    +
    +  override def visit(scan: TableFunctionScan): RelNode =
    +    throw new TableException("Table function scan in a stream environment 
is not supported yet.")
    +
    +  override def visit(values: LogicalValues): RelNode = values
    +
    +  override def visit(filter: LogicalFilter): RelNode = {
    +    // visit children and update inputs
    +    val input = filter.getInput.accept(this)
    +
    +    // check if input field contains time indicator type
    +    // materialize field if no time indicator is present anymore
    +    // if input field is already materialized, change to timestamp type
    +    val materializer = new RexTimeIndicatorMaterializer(
    +      rexBuilder,
    +      input.getRowType.getFieldList.map(_.getType))
    +
    +    val condition = filter.getCondition.accept(materializer)
    +    LogicalFilter.create(input, condition)
    +  }
    +
    +  override def visit(project: LogicalProject): RelNode = {
    +    // visit children and update inputs
    +    val input = project.getInput.accept(this)
    +
    +    // check if input field contains time indicator type
    +    // materialize field if no time indicator is present anymore
    +    // if input field is already materialized, change to timestamp type
    +    val materializer = new RexTimeIndicatorMaterializer(
    +      rexBuilder,
    +      input.getRowType.getFieldList.map(_.getType))
    +
    +    val projects = project.getProjects.map(_.accept(materializer))
    +    val fieldNames = project.getRowType.getFieldNames
    +    LogicalProject.create(input, projects, fieldNames)
       }
     
    -  private def buildRowType(names: Seq[String], types: Seq[RelDataType]): 
RelDataType = {
    -    val fields = names.zipWithIndex.map { case (name, idx) =>
    -      new RelDataTypeFieldImpl(name, idx, types(idx))
    +  override def visit(join: LogicalJoin): RelNode =
    +    throw new TableException("Logical join in a stream environment is not 
supported yet.")
    +
    +  override def visit(correlate: LogicalCorrelate): RelNode = {
    +    // visit children and update inputs
    +    val inputs = correlate.getInputs.map(_.accept(this))
    +
    +    val right = inputs(1) match {
    +      case scan: LogicalTableFunctionScan =>
    +        // visit children and update inputs
    +        val scanInputs = scan.getInputs.map(_.accept(this))
    +
    +        // check if input field contains time indicator type
    +        // materialize field if no time indicator is present anymore
    +        // if input field is already materialized, change to timestamp type
    +        val materializer = new RexTimeIndicatorMaterializer(
    +          rexBuilder,
    +          inputs.head.getRowType.getFieldList.map(_.getType))
    +
    +        val call = scan.getCall.accept(materializer)
    +        LogicalTableFunctionScan.create(
    +          scan.getCluster,
    +          scanInputs,
    +          call,
    +          scan.getElementType,
    +          scan.getRowType,
    +          scan.getColumnMappings)
    +
    +      case _ =>
    +        inputs(1)
         }
    -    new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
    +
    +    LogicalCorrelate.create(
    +      inputs.head,
    +      right,
    +      correlate.getCorrelationId,
    +      correlate.getRequiredColumns,
    +      correlate.getJoinType)
    +  }
    +
    +  private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
    --- End diff --
    
    Return a `(Option[LogicalProject], LogicalAggregate)` to cover the case 
when a `LogicalProject` needs to be prepended to materialize a grouping key or 
agg function argument (see comments below).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to