This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 25c4b6bf1be Fix bug in `previous-successful-dagrun` Execution API
endpoint (#48520)
25c4b6bf1be is described below
commit 25c4b6bf1bed14d15122f2fe8bffef0ed3767bb0
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 29 18:29:23 2025 +0530
Fix bug in `previous-successful-dagrun` Execution API endpoint (#48520)
The bug was as mentioned in https://github.com/apache/airflow/issues/48503
where the `logical_date` of the currrent TI was None.
We should return an empty `PrevSuccessfulDagRunResponse()` but because we
weren't checking for it, the API server failed with:
```
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:1160:
in test_ti_with_none_as_logical_date
response =
client.get(f"/execution/task-instances/{ti.id}/previous-successful-dagrun")
/usr/local/lib/python3.10/site-packages/starlette/testclient.py:465: in get
return super().get(
...
...
/usr/local/lib/python3.10/site-packages/cadwyn/schema_generation.py:504: in
__call__
return self._original_callable(*args, **kwargs)
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:555:
in get_previous_successful_dagrun
DR.logical_date < task_instance.logical_date,
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in
__lt__
return self.operate(lt, other)
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:322:
in operate
return op(self.comparator, *other, **kwargs)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in
__lt__
return self.operate(lt, other)
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/properties.py:426:
in operate
return op(self.__clause_element__(), *other, **kwargs)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in
__lt__
return self.operate(lt, other)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:873: in
operate
return op(self.comparator, *other, **kwargs)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/operators.py:368: in
__lt__
return self.operate(lt, other)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:1379: in
operate
return super(TypeDecorator.Comparator, self).operate(
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:77: in
operate
return o[0](self.expr, op, *(other + o[1:]), **kwargs)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/default_comparator.py:95:
in _boolean_compare
raise exc.ArgumentError(
E sqlalchemy.exc.ArgumentError: Only '=', '!=', 'is_()', 'is_not()',
'is_distinct_from()', 'is_not_distinct_from()' operators can be used with
None/True/False
```
---
.../execution_api/routes/task_instances.py | 2 +-
.../versions/head/test_task_instances.py | 33 ++++++++++++++++++++++
airflow-core/tests/unit/utils/test_log_handlers.py | 6 ++--
devel-common/src/tests_common/pytest_plugin.py | 19 ++++++++-----
4 files changed, 49 insertions(+), 11 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index e78e2d41063..ff0ca516314 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -545,7 +545,7 @@ def get_previous_successful_dagrun(
"""
ti_id_str = str(task_instance_id)
task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
- if not task_instance:
+ if not task_instance or not task_instance.logical_date:
return PrevSuccessfulDagRunResponse()
dag_run = session.scalar(
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index feaa508a87b..81c99b271be 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1132,6 +1132,39 @@ class TestPreviousDagRun:
"end_date": None,
}
+ def test_ti_with_none_as_logical_date(self, client, session,
create_task_instance, dag_maker):
+ ti = create_task_instance(
+ task_id="test_ti_with_none_as_logical_date",
+ dag_id="test_dag",
+ logical_date=None,
+ state=State.RUNNING,
+ start_date=timezone.datetime(2024, 1, 17),
+ session=session,
+ )
+ session.commit()
+
+ assert ti.logical_date is None
+
+ dr1 = dag_maker.create_dagrun(
+ run_id="test_ti_with_none_as_logical_date",
+ logical_date=timezone.datetime(2025, 1, 17),
+ run_type="scheduled",
+ state=State.SUCCESS,
+ session=session,
+ )
+ dr1.end_date = timezone.datetime(2025, 1, 17, 1, 0, 0)
+
+ session.commit()
+
+ response =
client.get(f"/execution/task-instances/{ti.id}/previous-successful-dagrun")
+ assert response.status_code == 200
+ assert response.json() == {
+ "data_interval_start": None,
+ "data_interval_end": None,
+ "start_date": None,
+ "end_date": None,
+ }
+
class TestGetRescheduleStartDate:
def test_get_start_date(self, client, session, create_task_instance):
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py
b/airflow-core/tests/unit/utils/test_log_handlers.py
index f94c83fe92b..72726cd9401 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -549,7 +549,7 @@ class TestFilenameRendering:
# With catchup=False:
# - If logical_date is None, it will use current date as the logical
date
# - If logical_date is explicitly provided, it will use that date
regardless of catchup setting
- expected_date = logical_date if logical_date is not None else
filename_rendering_ti.logical_date
+ expected_date = logical_date if logical_date is not None else
filename_rendering_ti.run_after
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
@@ -557,7 +557,7 @@ class TestFilenameRendering:
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
- assert expected_filename == rendered_filename
+ assert rendered_filename == expected_filename
def test_jinja_rendering(self, create_log_template, create_task_instance,
logical_date):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{
try_number }}.log")
@@ -592,7 +592,7 @@ class TestFilenameRendering:
# With catchup=False:
# - If logical_date is None, it will use current date as the logical
date
# - If logical_date is explicitly provided, it will use that date
regardless of catchup setting
- expected_date = logical_date if logical_date is not None else
filename_rendering_ti.logical_date
+ expected_date = logical_date if logical_date is not None else
filename_rendering_ti.run_after
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index baed8ca2ca8..7f7ac0c4a3f 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -784,6 +784,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
(want_activate_assets,) = serialized_marker.args or (True,)
from airflow.utils.log.logging_mixin import LoggingMixin
+ from airflow.utils.types import NOTSET
class DagFactory(LoggingMixin, DagMaker):
_own_session = False
@@ -901,10 +902,10 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
else:
self._bag_dag_compat(self.dag)
- def create_dagrun(self, *, logical_date=None, **kwargs):
+ def create_dagrun(self, *, logical_date=NOTSET, **kwargs):
from airflow.utils import timezone
from airflow.utils.state import DagRunState
- from airflow.utils.types import NOTSET, DagRunType
+ from airflow.utils.types import DagRunType
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
@@ -932,10 +933,10 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
if not isinstance(run_type, DagRunType):
run_type = DagRunType(run_type)
- if logical_date is NOTSET:
+ if logical_date is None:
# Explicit non requested
logical_date = None
- elif logical_date is None:
+ elif logical_date is NOTSET:
if run_type == DagRunType.MANUAL:
logical_date = self.start_date
else:
@@ -1227,7 +1228,7 @@ class CreateTaskInstance(Protocol):
def __call__(
self,
*,
- logical_date: datetime = ...,
+ logical_date: datetime | None = ...,
dagrun_state: DagRunState = ...,
state: TaskInstanceState = ...,
run_id: str = ...,
@@ -1366,11 +1367,13 @@ class CreateTaskInstanceOfOperator(Protocol):
@pytest.fixture
def create_serialized_task_instance_of_operator(dag_maker: DagMaker) ->
CreateTaskInstanceOfOperator:
+ from airflow.utils.types import NOTSET
+
def _create_task_instance(
operator_class,
*,
dag_id,
- logical_date=None,
+ logical_date=NOTSET,
session=None,
**operator_kwargs,
) -> TaskInstance:
@@ -1384,11 +1387,13 @@ def
create_serialized_task_instance_of_operator(dag_maker: DagMaker) -> CreateTa
@pytest.fixture
def create_task_instance_of_operator(dag_maker: DagMaker) ->
CreateTaskInstanceOfOperator:
+ from airflow.utils.types import NOTSET
+
def _create_task_instance(
operator_class,
*,
dag_id,
- logical_date=None,
+ logical_date=NOTSET,
session=None,
**operator_kwargs,
) -> TaskInstance: