This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5479d0b3875 Fix logical_date error in BranchDateTimeOperator and 
BranchDayOfWeekOperator (#48486)
5479d0b3875 is described below

commit 5479d0b387577a8c5a4b325968b56ebd52a985f3
Author: GPK <[email protected]>
AuthorDate: Sat Mar 29 08:38:59 2025 +0000

    Fix logical_date error in BranchDateTimeOperator and 
BranchDayOfWeekOperator (#48486)
    
    closes: #48473
---
 .../providers/standard/operators/datetime.py       |  5 +++-
 .../providers/standard/operators/weekday.py        |  7 +++--
 .../tests/unit/standard/operators/test_datetime.py | 31 ++++++++++++++++++++++
 .../tests/unit/standard/operators/test_weekday.py  | 26 ++++++++++++++++++
 4 files changed, 66 insertions(+), 3 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/operators/datetime.py 
b/providers/standard/src/airflow/providers/standard/operators/datetime.py
index dd788dd95fa..18c6b0a7a9a 100644
--- a/providers/standard/src/airflow/providers/standard/operators/datetime.py
+++ b/providers/standard/src/airflow/providers/standard/operators/datetime.py
@@ -77,7 +77,10 @@ class BranchDateTimeOperator(BaseBranchOperator):
 
     def choose_branch(self, context: Context) -> str | Iterable[str]:
         if self.use_task_logical_date:
-            now = context["logical_date"]
+            now = context.get("logical_date")
+            if not now:
+                dag_run = context.get("dag_run")
+                now = dag_run.run_after  # type: ignore[union-attr]
         else:
             now = timezone.coerce_datetime(timezone.utcnow())
         lower, upper = target_times_as_dates(now, self.target_lower, 
self.target_upper)
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/weekday.py 
b/providers/standard/src/airflow/providers/standard/operators/weekday.py
index 89a361385e9..bcae0b746c5 100644
--- a/providers/standard/src/airflow/providers/standard/operators/weekday.py
+++ b/providers/standard/src/airflow/providers/standard/operators/weekday.py
@@ -116,10 +116,13 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
 
     def choose_branch(self, context: Context) -> str | Iterable[str]:
         if self.use_task_logical_date:
-            now = context["logical_date"]
+            now = context.get("logical_date")
+            if not now:
+                dag_run = context.get("dag_run")
+                now = dag_run.run_after  # type: ignore[union-attr]
         else:
             now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
 
-        if now.isoweekday() in self._week_day_num:
+        if now.isoweekday() in self._week_day_num:  # type: ignore[union-attr]
             return self.follow_task_ids_if_true
         return self.follow_task_ids_if_false
diff --git a/providers/standard/tests/unit/standard/operators/test_datetime.py 
b/providers/standard/tests/unit/standard/operators/test_datetime.py
index f23c0f9b757..790a4d8c549 100644
--- a/providers/standard/tests/unit/standard/operators/test_datetime.py
+++ b/providers/standard/tests/unit/standard/operators/test_datetime.py
@@ -312,3 +312,34 @@ class TestBranchDateTimeOperator:
                     "branch_2": State.SKIPPED,
                 }
             )
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Skip on Airflow < 3.0")
+    @time_machine.travel("2020-12-01 09:00:00")
+    def test_choose_branch_should_use_run_after_when_logical_date_none(self, 
dag_maker):
+        with dag_maker(
+            "branch_datetime_operator_uses_run_after",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule=INTERVAL,
+            serialized=True,
+        ):
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+
+            branch_op = BranchDateTimeOperator(
+                task_id="datetime_branch",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                target_upper=datetime.datetime(2020, 9, 7, 11, 0, 0),
+                target_lower=datetime.datetime(2020, 6, 7, 10, 0, 0),
+                use_task_logical_date=True,
+            )
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
+
+        dr = dag_maker.create_dagrun(
+            run_id="manual__run_after",
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            **{"run_after": timezone.datetime(2020, 8, 7)},
+        )
+        assert branch_op.choose_branch(context={"dag_run": dr}) == "branch_1"
diff --git a/providers/standard/tests/unit/standard/operators/test_weekday.py 
b/providers/standard/tests/unit/standard/operators/test_weekday.py
index ddcacddfc43..583f20fd663 100644
--- a/providers/standard/tests/unit/standard/operators/test_weekday.py
+++ b/providers/standard/tests/unit/standard/operators/test_weekday.py
@@ -179,6 +179,32 @@ class TestBranchDayOfWeekOperator:
                 },
             )
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Skip on Airflow < 3.0")
+    @time_machine.travel("2021-01-25")  # Monday
+    def test_choose_branch_should_use_run_after_when_logical_date_none(self, 
dag_maker):
+        with dag_maker(
+            "branch_day_of_week_operator_test", start_date=DEFAULT_DATE, 
schedule=INTERVAL, serialized=True
+        ):
+            branch_op = BranchDayOfWeekOperator(
+                task_id="make_choice",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                week_day="Wednesday",
+                use_task_logical_date=True,  # We compare to DEFAULT_DATE 
which is Wednesday
+            )
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
+
+        dr = dag_maker.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            **{"run_after": DEFAULT_DATE},
+        )
+        assert branch_op.choose_branch(context={"dag_run": dr}) == "branch_1"
+
     @time_machine.travel("2021-01-25")  # Monday
     def test_branch_follow_false(self, dag_maker):
         """Checks if BranchDayOfWeekOperator follow false branch"""

Reply via email to