This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 25c4b6bf1be Fix bug in `previous-successful-dagrun` Execution API 
endpoint (#48520)
25c4b6bf1be is described below

commit 25c4b6bf1bed14d15122f2fe8bffef0ed3767bb0
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 29 18:29:23 2025 +0530

    Fix bug in `previous-successful-dagrun` Execution API endpoint (#48520)
    
    The bug was as mentioned in https://github.com/apache/airflow/issues/48503 
where the `logical_date` of the currrent TI was None.
    
    We should return an empty `PrevSuccessfulDagRunResponse()` but because we 
weren't checking for it, the API server failed with:
    
    ```
    
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:1160:
 in test_ti_with_none_as_logical_date
        response = 
client.get(f"/execution/task-instances/{ti.id}/previous-successful-dagrun")
    /usr/local/lib/python3.10/site-packages/starlette/testclient.py:465: in get
        return super().get(
    ...
    ...
    /usr/local/lib/python3.10/site-packages/cadwyn/schema_generation.py:504: in 
__call__
        return self._original_callable(*args, **kwargs)
    
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:555:
 in get_previous_successful_dagrun
        DR.logical_date < task_instance.logical_date,
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in 
__lt__
        return self.operate(lt, other)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:322: 
in operate
        return op(self.comparator, *other, **kwargs)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in 
__lt__
        return self.operate(lt, other)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/properties.py:426: 
in operate
        return op(self.__clause_element__(), *other, **kwargs)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in 
__lt__
        return self.operate(lt, other)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:873: in 
operate
        return op(self.comparator, *other, **kwargs)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in 
__lt__
        return self.operate(lt, other)
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:1379: in 
operate
        return super(TypeDecorator.Comparator, self).operate(
    /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:77: in 
operate
        return o[0](self.expr, op, *(other + o[1:]), **kwargs)
    
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/default_comparator.py:95:
 in _boolean_compare
        raise exc.ArgumentError(
    E   sqlalchemy.exc.ArgumentError: Only '=', '!=', 'is_()', 'is_not()', 
'is_distinct_from()', 'is_not_distinct_from()' operators can be used with 
None/True/False
    ```
---
 .../execution_api/routes/task_instances.py         |  2 +-
 .../versions/head/test_task_instances.py           | 33 ++++++++++++++++++++++
 airflow-core/tests/unit/utils/test_log_handlers.py |  6 ++--
 devel-common/src/tests_common/pytest_plugin.py     | 19 ++++++++-----
 4 files changed, 49 insertions(+), 11 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index e78e2d41063..ff0ca516314 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -545,7 +545,7 @@ def get_previous_successful_dagrun(
     """
     ti_id_str = str(task_instance_id)
     task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
-    if not task_instance:
+    if not task_instance or not task_instance.logical_date:
         return PrevSuccessfulDagRunResponse()
 
     dag_run = session.scalar(
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index feaa508a87b..81c99b271be 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1132,6 +1132,39 @@ class TestPreviousDagRun:
             "end_date": None,
         }
 
+    def test_ti_with_none_as_logical_date(self, client, session, 
create_task_instance, dag_maker):
+        ti = create_task_instance(
+            task_id="test_ti_with_none_as_logical_date",
+            dag_id="test_dag",
+            logical_date=None,
+            state=State.RUNNING,
+            start_date=timezone.datetime(2024, 1, 17),
+            session=session,
+        )
+        session.commit()
+
+        assert ti.logical_date is None
+
+        dr1 = dag_maker.create_dagrun(
+            run_id="test_ti_with_none_as_logical_date",
+            logical_date=timezone.datetime(2025, 1, 17),
+            run_type="scheduled",
+            state=State.SUCCESS,
+            session=session,
+        )
+        dr1.end_date = timezone.datetime(2025, 1, 17, 1, 0, 0)
+
+        session.commit()
+
+        response = 
client.get(f"/execution/task-instances/{ti.id}/previous-successful-dagrun")
+        assert response.status_code == 200
+        assert response.json() == {
+            "data_interval_start": None,
+            "data_interval_end": None,
+            "start_date": None,
+            "end_date": None,
+        }
+
 
 class TestGetRescheduleStartDate:
     def test_get_start_date(self, client, session, create_task_instance):
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index f94c83fe92b..72726cd9401 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -549,7 +549,7 @@ class TestFilenameRendering:
         # With catchup=False:
         # - If logical_date is None, it will use current date as the logical 
date
         # - If logical_date is explicitly provided, it will use that date 
regardless of catchup setting
-        expected_date = logical_date if logical_date is not None else 
filename_rendering_ti.logical_date
+        expected_date = logical_date if logical_date is not None else 
filename_rendering_ti.run_after
 
         expected_filename = (
             
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
@@ -557,7 +557,7 @@ class TestFilenameRendering:
         )
         fth = FileTaskHandler("")
         rendered_filename = fth._render_filename(filename_rendering_ti, 42)
-        assert expected_filename == rendered_filename
+        assert rendered_filename == expected_filename
 
     def test_jinja_rendering(self, create_log_template, create_task_instance, 
logical_date):
         create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ 
try_number }}.log")
@@ -592,7 +592,7 @@ class TestFilenameRendering:
         # With catchup=False:
         # - If logical_date is None, it will use current date as the logical 
date
         # - If logical_date is explicitly provided, it will use that date 
regardless of catchup setting
-        expected_date = logical_date if logical_date is not None else 
filename_rendering_ti.logical_date
+        expected_date = logical_date if logical_date is not None else 
filename_rendering_ti.run_after
 
         expected_filename = (
             
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index baed8ca2ca8..7f7ac0c4a3f 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -784,6 +784,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
         (want_activate_assets,) = serialized_marker.args or (True,)
 
     from airflow.utils.log.logging_mixin import LoggingMixin
+    from airflow.utils.types import NOTSET
 
     class DagFactory(LoggingMixin, DagMaker):
         _own_session = False
@@ -901,10 +902,10 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
             else:
                 self._bag_dag_compat(self.dag)
 
-        def create_dagrun(self, *, logical_date=None, **kwargs):
+        def create_dagrun(self, *, logical_date=NOTSET, **kwargs):
             from airflow.utils import timezone
             from airflow.utils.state import DagRunState
-            from airflow.utils.types import NOTSET, DagRunType
+            from airflow.utils.types import DagRunType
 
             if AIRFLOW_V_3_0_PLUS:
                 from airflow.utils.types import DagRunTriggeredByType
@@ -932,10 +933,10 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
             if not isinstance(run_type, DagRunType):
                 run_type = DagRunType(run_type)
 
-            if logical_date is NOTSET:
+            if logical_date is None:
                 # Explicit non requested
                 logical_date = None
-            elif logical_date is None:
+            elif logical_date is NOTSET:
                 if run_type == DagRunType.MANUAL:
                     logical_date = self.start_date
                 else:
@@ -1227,7 +1228,7 @@ class CreateTaskInstance(Protocol):
     def __call__(
         self,
         *,
-        logical_date: datetime = ...,
+        logical_date: datetime | None = ...,
         dagrun_state: DagRunState = ...,
         state: TaskInstanceState = ...,
         run_id: str = ...,
@@ -1366,11 +1367,13 @@ class CreateTaskInstanceOfOperator(Protocol):
 
 @pytest.fixture
 def create_serialized_task_instance_of_operator(dag_maker: DagMaker) -> 
CreateTaskInstanceOfOperator:
+    from airflow.utils.types import NOTSET
+
     def _create_task_instance(
         operator_class,
         *,
         dag_id,
-        logical_date=None,
+        logical_date=NOTSET,
         session=None,
         **operator_kwargs,
     ) -> TaskInstance:
@@ -1384,11 +1387,13 @@ def 
create_serialized_task_instance_of_operator(dag_maker: DagMaker) -> CreateTa
 
 @pytest.fixture
 def create_task_instance_of_operator(dag_maker: DagMaker) -> 
CreateTaskInstanceOfOperator:
+    from airflow.utils.types import NOTSET
+
     def _create_task_instance(
         operator_class,
         *,
         dag_id,
-        logical_date=None,
+        logical_date=NOTSET,
         session=None,
         **operator_kwargs,
     ) -> TaskInstance:

Reply via email to