[ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
-----------------------------------
    Description: 
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP<STRING, STRING> |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~|                                     table name |~
~+------------------------------------------------+~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~+------------------------------------------------+~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query 
to output the correct value but I don't think regular joins is what I need in 
this case as the metric table can change over time.

 

I have tried updating the query using different formats with temporal joins but 
I have not found a workaround and I don't know why this is happening. Attached 
you will find a file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 

  was:
There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
point and is causing the query below to output different results.

*Flink SQL Query:*

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

*Schema:*
~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~
~|      name |                        type |  null | key |        extras |      
    watermark |~
~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~
~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
SOURCE_WATERMARK() |~
~|  asset_id |                      BIGINT |  TRUE |     |               |      
              |~
~| metric_id |                         INT |  TRUE |     |               |      
              |~
~|     value |                      DOUBLE |  TRUE |     |               |      
              |~
~|  metadata |         MAP<STRING, STRING> |  TRUE |     |               |      
              |~
~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |      
              |~
~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~
~6 rows in set~
~+------------------------------------------------+~
~|                                     table name |~
~+------------------------------------------------+~
~|                                 asset_readings |~
~|              asset_relationship_parent_to_unit |~
~| asset_to_ignore_per_computed_metric_definition |~
~|                                         metric |~
~+------------------------------------------------+~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+, for the same query, this difference always results in 0
 * On Flink 1.18+, updating the query to use regular join against the metric 
lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query 
to output the correct value but I don't think regular joins is what I need in 
this case.

 

I have tried updating the query using different formats with temporal joins but 
I have not found a workaround and I don't know why this is happening. Attached 
you will find a file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 


> Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-36626
>                 URL: https://issues.apache.org/jira/browse/FLINK-36626
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.18.1, 1.20.0
>         Environment: AWS Managed Apache Flink 
>            Reporter: Eduardo Breijo
>            Priority: Critical
>         Attachments: Flink-SQL-query.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18+ from Flink 
> 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin 
> point and is causing the query below to output different results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
>     ~SELECT~
>       ~asset_id,~
>       ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
>       ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
>       ~LAST_VALUE(`value`) AS `value`~
>     ~FROM asset_readings~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
>     ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
>   ~)~
>   ~SELECT~
>   ~assets_supply_air_temp.`timestamp`,~
>   ~assets_supply_air_temp.asset_id,~
>   ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
>   ~FROM (~
>     ~SELECT asset_readings.`timestamp`,~
>     ~asset_readings.asset_id,~
>     ~asset_readings.`value` AS `value`~
>     ~FROM asset_readings~
>     ~-- Metrics temporal lookup inner join~
>     ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>     ~ON metric.metric_id = asset_readings.metric_id~
>     ~-- Assets to ignore for this computed metric definition temporal lookup 
> left join~
>     ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
> AS OF `proctime`~
>     ~ON 
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id 
> = :computedMetricDefinitionId~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
> asset_readings.asset_id~
>     ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
>     ~-- Filter assets not present in the asset to ignore for this computed 
> metric definition table~
>     ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
>   ~) AS assets_supply_air_temp~
>   ~INNER JOIN assets_setpoint~
>   ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
>   ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
> *Schema:*
> ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~|      name |                        type |  null | key |        extras |    
>       watermark |~
> ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
> SOURCE_WATERMARK() |~
> ~|  asset_id |                      BIGINT |  TRUE |     |               |    
>                 |~
> ~| metric_id |                         INT |  TRUE |     |               |    
>                 |~
> ~|     value |                      DOUBLE |  TRUE |     |               |    
>                 |~
> ~|  metadata |         MAP<STRING, STRING> |  TRUE |     |               |    
>                 |~
> ~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |    
>                 |~
> ~{+}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}------------\{+}~
> ~6 rows in set~
> ~+------------------------------------------------+~
> ~|                                     table name |~
> ~+------------------------------------------------+~
> ~|                                 asset_readings |~
> ~|              asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~|                                         metric |~
> ~+------------------------------------------------+~
> Results:
>  * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
> assets_setpoint is computed correctly for every value of the 
> assets_supply_air_temp (note that this subquery does not perform any 
> window-based grouping, so it is just raw data)
>  * On Flink 1.18+, for the same query, this difference always results in 0
>  * On Flink 1.18+, updating the query to use regular join against the metric 
> lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the 
> query to output the correct value but I don't think regular joins is what I 
> need in this case as the metric table can change over time.
>  
> I have tried updating the query using different formats with temporal joins 
> but I have not found a workaround and I don't know why this is happening. 
> Attached you will find a file with the different SQL formats I have tried 
> with no luck.
> Any help would be appreciated
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to