Eduardo Breijo created FLINK-36626: -------------------------------------- Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink 1.18+ Key: FLINK-36626 URL: https://issues.apache.org/jira/browse/FLINK-36626 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.20.0, 1.18.1 Environment: AWS Managed Apache Flink Reporter: Eduardo Breijo Attachments: Flink-SQL-query.txt
There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL joins that I haven't been able to pin point and is causing the query to output different results. Flink SQL Query: ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+ this difference always results in 0 I have tried updating the query using different formats but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated -- This message was sent by Atlassian Jira (v8.20.10#820010)