pnowojski commented on a change in pull request #6787: [FLINK-8577][table]
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247926320
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
##########
@@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder)
extends RelShuttle {
materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin,
indicesToMaterialize)
}
+ def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
Review comment:
I think this code is incorrect.
First, you didn't recursively call the `RelTimeIndicatorConverter` on the
`upsertToRetraction` input:
```
val rewrittenInput = upsertToRetraction.accept(this)
```
Secondly, `LogicalUpsertToRetraction` doesn't have to materialize any
fields: it doesn't invalidate watermarks guarantees (all records after the
watermark have a rowtime value above the watermark). In other words, it
preserves rowtime fields/watermark guarantees from it's input. A rowtime field
must be materialized if this contract between rowtime & watermark is violated,
like for example in the non windowed joins (rowtime fields in the join result
can be older than the watermark).
For `LogicalUpsertToRetraction` that's not the case, isn't it? If I'm
correct (and please correct me if I'm wrong), the code should look just like
this:
```
def visit(...):
val rewrittenInput = upsertToRetraction.accept(this)
return upsertToRetraction.copy(upsertToRetraction.getTraitSet,
Seq(rewrittenInput))
```
I think the second issues would be caught by a test that uses time windowed
join/aggregation on the upsert source (this should work, but in the current
version of this PR I would expect it to fail).
```
SELECT
key, max(value)
FROM
UpsertTable
GROUP BY
TUMBLE(rowTime1, INTERVAL '1' DAY), key
```
First issue is probably difficult to test/trigger since at the moment when
`RelTimeIndicatorConverter` is executed, `LogicalUpsertToRetraction` is always
just after the `TableScan` node. However if we changed the order of
`CalcUpsertToRetractionTransposeRule` and `RelTimeIndicatorConverter` current
code would fail to materialize fields for queries like:
```
SELECT
key, max(value)
FROM (
SELECT rowTime1 + 1 as rowTime2, key, value FROM UpsertTable)
GROUP BY
TUMBLE(rowTime2, INTERVAL '1' DAY), key
```
Even if I'm wrong, could you add those two unit tests?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services