This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 60f6a9f1a1a Fix DayOfWeekSensor use_task_logical_date condition
(#47825)
60f6a9f1a1a is described below
commit 60f6a9f1a1ab555f932503954a536b5878a96843
Author: GPK <[email protected]>
AuthorDate: Sun Mar 16 17:27:33 2025 +0000
Fix DayOfWeekSensor use_task_logical_date condition (#47825)
closes #47717
using `run_after ` from dag run.
---
.../airflow/providers/standard/sensors/weekday.py | 16 +++++++++++++++-
.../tests/unit/standard/sensors/test_weekday.py | 22 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/weekday.py
b/providers/standard/src/airflow/providers/standard/sensors/weekday.py
index 29d3442ddb7..842bdd3f094 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/weekday.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/weekday.py
@@ -103,7 +103,21 @@ class DayOfWeekSensor(BaseSensorOperator):
self.week_day,
WeekDay(timezone.utcnow().isoweekday()).name,
)
+
if self.use_task_logical_date:
- return context["logical_date"].isoweekday() in self._week_day_num
+ logical_date = context.get("logical_date")
+ dag_run = context.get("dag_run")
+
+ if not (logical_date or (dag_run and dag_run.run_after)):
+ raise ValueError(
+ "Either `logical_date` or `run_after` should be provided
in the task context when "
+ "`use_task_logical_date` is True"
+ )
+
+ determined_weekday_num = (
+ logical_date.isoweekday() if logical_date else
dag_run.run_after.isoweekday() # type: ignore[union-attr]
+ )
+
+ return determined_weekday_num in self._week_day_num
else:
return timezone.utcnow().isoweekday() in self._week_day_num
diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py
b/providers/standard/tests/unit/standard/sensors/test_weekday.py
index 28795899b73..e1bcee18650 100644
--- a/providers/standard/tests/unit/standard/sensors/test_weekday.py
+++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py
@@ -25,6 +25,8 @@ from airflow.exceptions import AirflowSensorTimeout
from airflow.models import DagBag
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
+from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.utils import timezone
from airflow.utils.timezone import datetime
from airflow.utils.weekday import WeekDay
@@ -128,3 +130,23 @@ class TestDayOfWeekSensor:
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE,
ignore_ti_state=True)
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Skip on Airflow < 3.0")
+ def
test_weekday_sensor_should_use_run_after_when_logical_date_is_not_provided(self,
dag_maker):
+ with dag_maker(
+ "test_weekday_sensor",
+ schedule=None,
+ ) as dag:
+ op = DayOfWeekSensor(
+ task_id="weekday_sensor_check_true",
+ week_day={"Monday", "Tuesday", "Wednesday", "Thursday",
"Friday", "Saturday", "Sunday"},
+ use_task_logical_date=True,
+ dag=dag,
+ )
+ dr = dag_maker.create_dagrun(
+ run_id="manual_run",
+ start_date=DEFAULT_DATE,
+ logical_date=None,
+ **{"run_after": timezone.utcnow()},
+ )
+ assert op.poke(context={"logical_date": None, "dag_run": dr}) is
True