[
https://issues.apache.org/jira/browse/FLINK-6483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004582#comment-16004582
]
ASF GitHub Bot commented on FLINK-6483:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3862#discussion_r115723393
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
---
@@ -114,101 +78,207 @@ class RelTimeIndicatorConverter(rexBuilder:
RexBuilder) extends RelShuttleImpl {
"Union fields with time attributes have different types.")
}
- updatedUnion
+ LogicalUnion.create(inputs, union.all)
}
+ override def visit(aggregate: LogicalAggregate): RelNode =
convertAggregate(aggregate)
+
+ override def visit(minus: LogicalMinus): RelNode =
+ throw new TableException("Logical minus in a stream environment is not
supported yet.")
+
+ override def visit(sort: LogicalSort): RelNode =
+ throw new TableException("Logical sort in a stream environment is not
supported yet.")
+
+ override def visit(`match`: LogicalMatch): RelNode =
+ throw new TableException("Logical match in a stream environment is not
supported yet.")
+
override def visit(other: RelNode): RelNode = other match {
- case scan: LogicalTableFunctionScan if
- stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
+
+ case uncollect: Uncollect =>
// visit children and update inputs
- val updatedScan =
super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
-
- val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
-
- // check if input field contains time indicator type
- // materialize field if no time indicator is present anymore
- // if input field is already materialized, change to timestamp type
- val materializer = new RexTimeIndicatorMaterializer(
- rexBuilder,
- correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
- val newCall = updatedScan.getCall.accept(materializer)
-
- // copy scan
- updatedScan.copy(
- updatedScan.getTraitSet,
- updatedScan.getInputs,
- newCall,
- updatedScan.getElementType,
- updatedScan.getRowType,
- updatedScan.getColumnMappings
- )
+ val input = uncollect.getInput.accept(this)
+ Uncollect.create(uncollect.getTraitSet, input,
uncollect.withOrdinality)
+
+ case scan: LogicalTableFunctionScan =>
+ scan
+
+ case aggregate: LogicalWindowAggregate =>
+ val convAggregate = convertAggregate(aggregate)
+
+ LogicalWindowAggregate.create(
+ aggregate.getWindow,
+ aggregate.getNamedProperties,
+ convAggregate)
case _ =>
- super.visit(other)
+ throw new TableException(s"Unsupported logical operator:
${other.getClass.getSimpleName}")
+ }
+
+
+ override def visit(exchange: LogicalExchange): RelNode =
+ throw new TableException("Logical exchange in a stream environment is
not supported yet.")
+
+ override def visit(scan: TableScan): RelNode = scan
+
+ override def visit(scan: TableFunctionScan): RelNode =
+ throw new TableException("Table function scan in a stream environment
is not supported yet.")
+
+ override def visit(values: LogicalValues): RelNode = values
+
+ override def visit(filter: LogicalFilter): RelNode = {
+ // visit children and update inputs
+ val input = filter.getInput.accept(this)
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ input.getRowType.getFieldList.map(_.getType))
+
+ val condition = filter.getCondition.accept(materializer)
+ LogicalFilter.create(input, condition)
+ }
+
+ override def visit(project: LogicalProject): RelNode = {
+ // visit children and update inputs
+ val input = project.getInput.accept(this)
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ input.getRowType.getFieldList.map(_.getType))
+
+ val projects = project.getProjects.map(_.accept(materializer))
+ val fieldNames = project.getRowType.getFieldNames
+ LogicalProject.create(input, projects, fieldNames)
}
- private def buildRowType(names: Seq[String], types: Seq[RelDataType]):
RelDataType = {
- val fields = names.zipWithIndex.map { case (name, idx) =>
- new RelDataTypeFieldImpl(name, idx, types(idx))
+ override def visit(join: LogicalJoin): RelNode =
+ throw new TableException("Logical join in a stream environment is not
supported yet.")
+
+ override def visit(correlate: LogicalCorrelate): RelNode = {
+ // visit children and update inputs
+ val inputs = correlate.getInputs.map(_.accept(this))
+
+ val right = inputs(1) match {
+ case scan: LogicalTableFunctionScan =>
+ // visit children and update inputs
+ val scanInputs = scan.getInputs.map(_.accept(this))
+
+ // check if input field contains time indicator type
+ // materialize field if no time indicator is present anymore
+ // if input field is already materialized, change to timestamp type
+ val materializer = new RexTimeIndicatorMaterializer(
+ rexBuilder,
+ inputs.head.getRowType.getFieldList.map(_.getType))
+
+ val call = scan.getCall.accept(materializer)
+ LogicalTableFunctionScan.create(
+ scan.getCluster,
+ scanInputs,
+ call,
+ scan.getElementType,
+ scan.getRowType,
+ scan.getColumnMappings)
+
+ case _ =>
+ inputs(1)
}
- new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
+
+ LogicalCorrelate.create(
+ inputs.head,
+ right,
+ correlate.getCorrelationId,
+ correlate.getRequiredColumns,
+ correlate.getJoinType)
+ }
+
+ private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
--- End diff --
Return a `(Option[LogicalProject], LogicalAggregate)` to cover the case
when a `LogicalProject` needs to be prepended to materialize a grouping key or
agg function argument (see comments below).
> Support time materialization
> ----------------------------
>
> Key: FLINK-6483
> URL: https://issues.apache.org/jira/browse/FLINK-6483
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> FLINK-5884 added support for time indicators. However, there are still some
> features missing i.e. materialization of metadata timestamp.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)