schevalley2 opened a new pull request, #24506: URL: https://github.com/apache/flink/pull/24506
## What is the purpose of the change This is more of a proposal to demonstrate a possible fix. I am looking for feedback for people that are more knowledgeable. Following this thread on the mailing list: https://lists.apache.org/thread/9q7sjyqptcnw1371wc190496nwpdv1tz Given an order table: ```sql CREATE TABLE orders ( order_id INT, price DECIMAL(6, 2), currency_id INT, order_time AS NOW(), WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND ) WITH (…) ``` and a currency rate table: ```sql CREATE TABLE currency_rates ( currency_id INT, conversion_rate DECIMAL(4, 3), created_at AS NOW(), WATERMARK FOR created_at AS created_at - INTERVAL '2' SECOND PRIMARY KEY (currency_id) NOT ENFORCED ) WITH (…) ``` that we would aggregate in an unbounded way like this: ```sql CREATE TEMPORARY VIEW max_rates AS ( SELECT currency_id, MAX(conversion_rate) AS max_rate FROM currency_rates GROUP BY currency_id ); ``` It's not possible to do a temporal join between `orders` and `max_rates` and it fails with the following error: ``` Exception in thread "main" org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found. ``` After some investigation we realised the way the temporal join checks for event/proc time is by looking if the row types contains some timing information, so we added to `max_rates` another columns like this: ```sql CREATE TEMPORARY VIEW max_rates AS ( SELECT currency_id, MAX(conversion_rate) AS max_rate, LAST_VALUE(created_at) AS updated_at FROM currency_rates GROUP BY currency_id ); ``` However, `LAST_VALUE` does not support timestamp type ([FLINK-15867](https://issues.apache.org/jira/browse/FLINK-15867)). We added that and we ended up with a Planner assertion error: ``` java.lang.AssertionError: Sql optimization: Assertion error: type mismatch: ref: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL input: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) ``` We understood the issue was in the way `RelTimeIndicatorConverter` rewrites the FlinkLogicalJoin. Left and right input expressions get their `TimeIndicatorRelDataType` types replaced by normal timestamps but in the temporal join case, the `condition` is not rewritten (but in the `else` it's actually done). However, I thought it might not be the solution to the problem. So I also compared if I replaced `max_rates` definition with a simple `SELECT` like in: ```sql CREATE TEMPORARY VIEW max_rates AS ( SELECT currency_id, conversion_rate AS max_rate, created_at AS updated_at FROM currency_rates ); ``` What I've noticed is that the timestamp on the `right` side of the join is not replaced and stay a `TimeIndicatorRelDataType`. This is because the graph of `RelNode` on the right side is: ``` FlinkLogicalTableSourceScan -> FlinkLogicalCalc -> FlinkLogicalWatermarkAssigner -> FlinkLogicalSnapshot -> FlinkLogicalJoin ``` and `WatermarkAssigner` overrides `deriveRowType` which actually force the `TimeIndicatorRelDataType` to be there, whereas for the `FlinkLogicalAggregate` it simply gets converted into a normal timestamp. So the use of `LAST_VALUE(…)` here is a bit of a hack to keep having the time information in the query. It actually would not even work depending on the aggregation ones want to write. However, it seems that supporting temporal join with rolling aggregate would be a good idea. Looking forward to discuss more on this with you. ## Brief change log ## Verifying this change - *Added test that execute the type of queries we wanted to support* - *Added tests that checks that with the new feature disabled, the join is not supported* - ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't no / no (it's part of the planner) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
