Eduardo Breijo created FLINK-36626:
--------------------------------------

             Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 
1.18+
                 Key: FLINK-36626
                 URL: https://issues.apache.org/jira/browse/FLINK-36626
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.20.0, 1.18.1
         Environment: AWS Managed Apache Flink 
            Reporter: Eduardo Breijo
         Attachments: Flink-SQL-query.txt

There is a behavior change I found when migrating to Flink 1.18+ from Flink 
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is 
causing the query to output different results.

Flink SQL Query:

~WITH assets_setpoint AS (~
    ~SELECT~
      ~asset_id,~
      ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
      ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
      ~LAST_VALUE(`value`) AS `value`~
    ~FROM asset_readings~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
    ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
  ~)~
  ~SELECT~
  ~assets_supply_air_temp.`timestamp`,~
  ~assets_supply_air_temp.asset_id,~
  ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
  ~FROM (~
    ~SELECT asset_readings.`timestamp`,~
    ~asset_readings.asset_id,~
    ~asset_readings.`value` AS `value`~
    ~FROM asset_readings~
    ~-- Metrics temporal lookup inner join~
    ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
    ~ON metric.metric_id = asset_readings.metric_id~
    ~-- Assets to ignore for this computed metric definition temporal lookup 
left join~
    ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
AS OF `proctime`~
    ~ON 
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = 
:computedMetricDefinitionId~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
asset_readings.asset_id~
    ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
    ~-- Filter assets not present in the asset to ignore for this computed 
metric definition table~
    ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
  ~) AS assets_supply_air_temp~
  ~INNER JOIN assets_setpoint~
  ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
  ~WHERE assets_supply_air_temp.`timestamp` BETWEEN 
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~

Results:
 * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
assets_setpoint is computed correctly for every value of the 
assets_supply_air_temp (note that this subquery does not perform any 
window-based grouping, so it is just raw data)
 * On Flink 1.18+ this difference always results in 0

 

I have tried updating the query using different formats but I have not found a 
workaround and I don't know why this is happening. Attached you will find a 
file with the different SQL formats I have tried with no luck.

Any help would be appreciated

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to