schevalley2 opened a new pull request, #24506:
URL: https://github.com/apache/flink/pull/24506

   ## What is the purpose of the change
   
   This is more of a proposal to demonstrate a possible fix. I am looking for 
feedback for people that are more knowledgeable.
   
   Following this thread on the mailing list: 
https://lists.apache.org/thread/9q7sjyqptcnw1371wc190496nwpdv1tz 
   
   Given an order table:
   
   ```sql
   CREATE TABLE orders (
       order_id INT,
       price DECIMAL(6, 2),
       currency_id INT,
       order_time AS NOW(),
       WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
   ) WITH (…)
   ```
   
   and a currency rate table:
   
   ```sql
   CREATE TABLE currency_rates (
       currency_id INT,
       conversion_rate DECIMAL(4, 3),
       created_at AS NOW(),
       WATERMARK FOR created_at AS created_at - INTERVAL '2' SECOND
       PRIMARY KEY (currency_id) NOT ENFORCED
   ) WITH (…)
   ```
   
   that we would aggregate in an unbounded way like this:
   
   ```sql
   CREATE TEMPORARY VIEW max_rates AS (
       SELECT
           currency_id,
           MAX(conversion_rate) AS max_rate
       FROM currency_rates
       GROUP BY currency_id
   );
   ```
   
   It's not possible to do a temporal join between `orders` and `max_rates` and 
it fails with the following error:
   
   ```
   Exception in thread "main" org.apache.flink.table.api.ValidationException:
   Event-Time Temporal Table Join  requires both primary key and row time 
   attribute in versioned table, but no row time attribute can be found.
   ```
   
   After some investigation we realised the way the temporal join checks for 
event/proc time is by looking if the row types contains some timing 
information, so we added to `max_rates` another columns like this:
   
   ```sql
   CREATE TEMPORARY VIEW max_rates AS (
       SELECT
           currency_id,
           MAX(conversion_rate) AS max_rate,
           LAST_VALUE(created_at) AS updated_at
       FROM currency_rates
       GROUP BY currency_id
   );
   ```
   
   However, `LAST_VALUE` does not support timestamp type 
([FLINK-15867](https://issues.apache.org/jira/browse/FLINK-15867)). We added 
that and we ended up with a Planner assertion error:
   
   ```
   java.lang.AssertionError: Sql optimization: Assertion error: type mismatch:
   ref:
   TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
   input:
   TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
   
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
   ```
   
   We understood the issue was in the way `RelTimeIndicatorConverter` rewrites 
the FlinkLogicalJoin. Left and right input expressions get their 
`TimeIndicatorRelDataType` types replaced by normal timestamps but in the 
temporal join case, the `condition` is not rewritten (but in the `else` it's 
actually done).
   
   However, I thought it might not be the solution to the problem. So I also 
compared if I replaced `max_rates` definition with a simple `SELECT` like in:
   
   ```sql
   CREATE TEMPORARY VIEW max_rates AS (
          SELECT
           currency_id,
           conversion_rate AS max_rate,
           created_at AS updated_at
       FROM currency_rates
   );
   ```
   
   What I've noticed is that the timestamp on the `right` side of the join is 
not replaced and stay a `TimeIndicatorRelDataType`. This is because the graph 
of `RelNode` on the right side is:
   
   ```
   FlinkLogicalTableSourceScan -> FlinkLogicalCalc -> 
FlinkLogicalWatermarkAssigner -> FlinkLogicalSnapshot -> FlinkLogicalJoin
   ```
   
   and `WatermarkAssigner` overrides `deriveRowType` which actually force the 
`TimeIndicatorRelDataType` to be there, whereas for the `FlinkLogicalAggregate` 
it simply gets converted into a normal timestamp.
   
   So the use of `LAST_VALUE(…)` here is a bit of a hack to keep having the 
time information in the query. It actually would not even work depending on the 
aggregation ones want to write.
   
   However, it seems that supporting temporal join with rolling aggregate would 
be a good idea.
   
   Looking forward to discuss more on this with you.
   
   ## Brief change log
   
   ## Verifying this change
   
     - *Added test that execute the type of queries we wanted to support*
     - *Added tests that checks that with the new feature disabled, the join is 
not supported*
     - 
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): don't no / no 
(it's part of the planner)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? not documented
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to