This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 60f6a9f1a1a Fix DayOfWeekSensor use_task_logical_date condition 
(#47825)
60f6a9f1a1a is described below

commit 60f6a9f1a1ab555f932503954a536b5878a96843
Author: GPK <[email protected]>
AuthorDate: Sun Mar 16 17:27:33 2025 +0000

    Fix DayOfWeekSensor use_task_logical_date condition (#47825)
    
    closes #47717
    using `run_after ` from dag run.
---
 .../airflow/providers/standard/sensors/weekday.py  | 16 +++++++++++++++-
 .../tests/unit/standard/sensors/test_weekday.py    | 22 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/sensors/weekday.py 
b/providers/standard/src/airflow/providers/standard/sensors/weekday.py
index 29d3442ddb7..842bdd3f094 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/weekday.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/weekday.py
@@ -103,7 +103,21 @@ class DayOfWeekSensor(BaseSensorOperator):
             self.week_day,
             WeekDay(timezone.utcnow().isoweekday()).name,
         )
+
         if self.use_task_logical_date:
-            return context["logical_date"].isoweekday() in self._week_day_num
+            logical_date = context.get("logical_date")
+            dag_run = context.get("dag_run")
+
+            if not (logical_date or (dag_run and dag_run.run_after)):
+                raise ValueError(
+                    "Either `logical_date` or `run_after` should be provided 
in the task context when "
+                    "`use_task_logical_date` is True"
+                )
+
+            determined_weekday_num = (
+                logical_date.isoweekday() if logical_date else 
dag_run.run_after.isoweekday()  # type: ignore[union-attr]
+            )
+
+            return determined_weekday_num in self._week_day_num
         else:
             return timezone.utcnow().isoweekday() in self._week_day_num
diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py 
b/providers/standard/tests/unit/standard/sensors/test_weekday.py
index 28795899b73..e1bcee18650 100644
--- a/providers/standard/tests/unit/standard/sensors/test_weekday.py
+++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py
@@ -25,6 +25,8 @@ from airflow.exceptions import AirflowSensorTimeout
 from airflow.models import DagBag
 from airflow.models.dag import DAG
 from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
+from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.utils import timezone
 from airflow.utils.timezone import datetime
 from airflow.utils.weekday import WeekDay
 
@@ -128,3 +130,23 @@ class TestDayOfWeekSensor:
         )
         with pytest.raises(AirflowSensorTimeout):
             op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, 
ignore_ti_state=True)
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Skip on Airflow < 3.0")
+    def 
test_weekday_sensor_should_use_run_after_when_logical_date_is_not_provided(self,
 dag_maker):
+        with dag_maker(
+            "test_weekday_sensor",
+            schedule=None,
+        ) as dag:
+            op = DayOfWeekSensor(
+                task_id="weekday_sensor_check_true",
+                week_day={"Monday", "Tuesday", "Wednesday", "Thursday", 
"Friday", "Saturday", "Sunday"},
+                use_task_logical_date=True,
+                dag=dag,
+            )
+            dr = dag_maker.create_dagrun(
+                run_id="manual_run",
+                start_date=DEFAULT_DATE,
+                logical_date=None,
+                **{"run_after": timezone.utcnow()},
+            )
+            assert op.poke(context={"logical_date": None, "dag_run": dr}) is 
True

Reply via email to