This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d30dde27eac Add backcompat to openlineage provider method (#48406)
d30dde27eac is described below
commit d30dde27eac835473756d754fcd5e87ae4e00f3c
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Mar 26 17:51:28 2025 +0100
Add backcompat to openlineage provider method (#48406)
This commit adds a backcompat on how we check if TaskReschedule exists.
I mistakenly added this while reducing composite keys in TaskReschedle
---
.../src/airflow/providers/openlineage/utils/utils.py | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index b29d4fe0463..e4cc5c964ad 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -210,10 +210,22 @@ def is_ti_rescheduled_already(ti: TaskInstance,
session=NEW_SESSION):
if not ti.task.reschedule:
return False
-
+ if AIRFLOW_V_3_0_PLUS:
+ return (
+ session.query(
+ exists().where(TaskReschedule.ti_id == ti.id,
TaskReschedule.try_number == ti.try_number)
+ ).scalar()
+ is True
+ )
return (
session.query(
- exists().where(TaskReschedule.ti_id == ti.id,
TaskReschedule.try_number == ti.try_number)
+ exists().where(
+ TaskReschedule.dag_id == ti.dag_id,
+ TaskReschedule.task_id == ti.task_id,
+ TaskReschedule.run_id == ti.run_id,
+ TaskReschedule.map_index == ti.map_index,
+ TaskReschedule.try_number == ti.try_number,
+ )
).scalar()
is True
)