pnowojski commented on a change in pull request #6787: [FLINK-8577][table]
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008338
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
##########
@@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder)
extends RelShuttle {
override def visit(exchange: LogicalExchange): RelNode =
throw new TableException("Logical exchange in a stream environment is not
supported yet.")
- override def visit(scan: TableScan): RelNode = scan
+ override def visit(scan: TableScan): RelNode = {
+ val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+ if (upsertStreamTable != null) {
+ val relTypes = scan.getRowType.getFieldList.map(_.getType)
+ val timeIndicatorIndexes = relTypes.zipWithIndex
+ .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+ .map(_._2)
+ val input = if (timeIndicatorIndexes.nonEmpty) {
+ // materialize time indicator
+ val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+ materializerUtils.projectAndMaterializeFields(rewrittenScan,
timeIndicatorIndexes.toSet)
+ } else {
+ scan
+ }
+
+ LogicalLastRow.create(
Review comment:
I have revisited our previous discussion in the design/google doc and I
still have the same concern as I had then:
> In that case this is strange. It's like having
filter/projection/aggregation nodes that after pruning/removing/pushing them
down are sometimes NO-OPs and sometimes not. Besides being strange it can have
some negative side effects:
> - presence of a no-op node (when it doesn't need to be there) can
block/mess with other optimisations or make them more complex
> - it will pollute printed explain plan to the user. This is important,
since this will be very costly node and it will be very hard to explain users
when this node requires huge state and when not
> - it would make cost computations more complicated
Back then it sparked a discussion, where you were saying that it will never
be a no-op and you wanted to `LastRow` always have a state and purpose besides
upsert to retraction conversion (filtering out empty deletes). However that was
resolved and filtering out empty deletes was dropped. Thus it brings me back to
the issue of having this as a no-op node in the plan.
This connects with your other response regarding naming `LastRow`
> The LastRow node may be a no-op node in the case such as upsert source ->
calc -> upsert sink. While LastRow will convert upsert stream to retract stream
if a downstream node needs it to, such as upsert source -> calc -> retract
sink. Whether convert to retract stream will be decided by RetractionRules.
I would still argue that it should be named `UpsertToRetractionConverter`
(or sth along those lines) and if not needed, it should be not in the plan.
Maybe this means that our `RetractionRules` are not sufficient and needs some
refactoring.
I could see couple of solutions to that, but probably the best would be to
deduce whether we need to insert `UpsertToRetractionConverter` or not inside
the rule that is supposed to create it. Further optimisations/rewrites would
have to correctly handle/preserve semantic/trait of upserts vs retractions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services