Copilot commented on code in PR #59764:
URL: https://github.com/apache/airflow/pull/59764#discussion_r2820442921
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
# These changes are committed by the calling function.
+def _get_new_tasks(
+ dag_id: str,
+ run_id: str,
+ session: Session,
+) -> Collection[str | tuple[str, int]]:
+ """
+ Get task instances for newly added tasks in the latest DAG version.
+
+ Updates the DAG run to the latest version if new tasks are found and
+ creates task instances for them via verify_integrity.
+
+ :param run_id: The run_id for the DAG run
+ :param dag_id: The dag_id for the DAG
+ :param session: SQLAlchemy session
+ :return: List of task instances for newly added tasks
Review Comment:
The docstring at line 244 says it returns "List of task instances for newly
added tasks", but the function actually returns a list of task IDs (strings),
not TaskInstance objects. The return type annotation correctly shows
`Collection[str | tuple[str, int]]`, but the docstring description should be
updated to say "List of task IDs for newly added tasks" or "Collection of task
IDs or (task_id, map_index) tuples for newly added tasks".
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1008,7 +1014,12 @@ def clear(
tuples that should not be cleared
:param exclude_run_ids: A set of ``run_id`` or (``run_id``)
"""
- from airflow.models.taskinstance import clear_task_instances
+ from airflow.models.taskinstance import _get_new_tasks,
clear_task_instances
+
+ if only_new:
Review Comment:
When `only_new=True`, the code unconditionally overwrites the `task_ids`
parameter on line 1022. This means if a user provides both `task_ids` and
`only_new=True`, the provided `task_ids` will be silently ignored. This could
lead to unexpected behavior.
Consider adding validation to make `only_new` and `task_ids` mutually
exclusive (raising a `ValueError` if both are provided), or document this
behavior clearly. The validation should check `if only_new and task_ids is not
None:`.
```suggestion
if only_new:
if task_ids is not None:
raise ValueError("only_new and task_ids are mutually
exclusive")
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
# These changes are committed by the calling function.
+def _get_new_tasks(
+ dag_id: str,
+ run_id: str,
+ session: Session,
+) -> Collection[str | tuple[str, int]]:
+ """
+ Get task instances for newly added tasks in the latest DAG version.
+
+ Updates the DAG run to the latest version if new tasks are found and
+ creates task instances for them via verify_integrity.
+
+ :param run_id: The run_id for the DAG run
+ :param dag_id: The dag_id for the DAG
Review Comment:
The docstring parameter order (lines 241-243) doesn't match the function
signature parameter order (lines 231-233). The function signature has `dag_id,
run_id, session` but the docstring lists them as `run_id, dag_id, session`. The
docstring should match the signature order for consistency.
```suggestion
:param dag_id: The dag_id for the DAG
:param run_id: The run_id for the DAG run
```
##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
assert TaskInstanceState.REMOVED not in [ti.state for ti in
dr.task_instances]
for ti in dr.task_instances:
assert ti.dag_version_id == old_dag_version.id
+
+ def test_clear_only_new_tasks(self, dag_maker, session):
+ """Test that only_new queues only newly added tasks without clearing
existing ones."""
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ EmptyOperator(task_id="2")
+ EmptyOperator(task_id="3")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ count = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ session=session,
+ )
+
+ session.flush()
+ dr.refresh_from_db(session)
+
+ # Should return 2 new tasks
+ assert count == 2
+
+ def test_clear_only_new_tasks_dry_run(self, dag_maker, session):
+ """Test that only_new with dry_run returns new tasks without making
changes."""
+ with dag_maker(
+ "test_clear_new_task_instances_dry_run",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_new_task_instances_dry_run",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ EmptyOperator(task_id="2")
+ EmptyOperator(task_id="3")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ # Dry run - should return list of new tasks without making changes
+ new_tis = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ dry_run=True,
+ session=session,
+ )
+
+ session.flush()
+ dr.refresh_from_db(session)
+
+ # Check if correct tasks are returned
+ assert len(new_tis) == 2
+ assert [new_tis[0].task_id, new_tis[1].task_id] == ["2", "3"]
+ assert [new_tis[0].state, new_tis[1].state] == [None, None]
+
+ # Verify no changes were made to the database
+ assert dr.created_dag_version_id == old_dag_version.id
+ assert len(dr.task_instances) == 2 # should be only the 2 earlier
tasks
+
+ def test_clear_only_new_no_new_tasks(self, dag_maker, session):
+ """Test that only_new returns 0 when no new tasks are added."""
+ with dag_maker(
+ "test_clear_no_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_no_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ # Clear with only_new should return 0
+ count = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ session=session,
+ )
+
+ assert count == 0
Review Comment:
The test coverage is missing a test case to verify that providing
`only_new=True` without `run_id` raises a `ValueError` with the message
"only_new requires run_id to be specified". This validation is implemented in
the code but not tested.
```suggestion
assert count == 0
def test_clear_only_new_without_run_id_raises(self, dag_maker, session):
"""Test that only_new=True without run_id raises a ValueError."""
with dag_maker(
"test_clear_only_new_without_run_id",
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
catchup=True,
bundle_version="v1",
) as dag:
EmptyOperator(task_id="task")
dag_maker.create_dagrun(
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
with pytest.raises(ValueError, match="only_new requires run_id to be
specified"):
dag.clear(
only_new=True,
session=session,
)
```
##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
assert TaskInstanceState.REMOVED not in [ti.state for ti in
dr.task_instances]
for ti in dr.task_instances:
assert ti.dag_version_id == old_dag_version.id
+
+ def test_clear_only_new_tasks(self, dag_maker, session):
+ """Test that only_new queues only newly added tasks without clearing
existing ones."""
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ EmptyOperator(task_id="2")
+ EmptyOperator(task_id="3")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ count = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ session=session,
+ )
+
+ session.flush()
+ dr.refresh_from_db(session)
+
+ # Should return 2 new tasks
+ assert count == 2
+
Review Comment:
The test verifies that 2 new tasks are returned, but it doesn't verify
important aspects of the functionality:
1. That the existing task instances (tasks "0" and "1") remain in their
SUCCESS state and are not cleared
2. That the new task instances (tasks "2" and "3") are actually created in
the database with the correct state
3. That the dag_run's `created_dag_version_id` is updated to the new version
4. That the dag_run's state is set to QUEUED (the default for dag_run_state
parameter)
These assertions would provide more comprehensive coverage of the only_new
functionality.
```suggestion
# Existing task instances should remain in SUCCESS state and not be
cleared
ti0_after, ti1_after = sorted(
(
session.execute(
select(TaskInstance).where(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == dr.run_id,
TaskInstance.task_id == task_id,
)
)
.scalars()
.one()
for task_id in ["0", "1"]
),
key=lambda ti: ti.task_id,
)
assert ti0_after.state == TaskInstanceState.SUCCESS
assert ti1_after.state == TaskInstanceState.SUCCESS
# New task instances for tasks "2" and "3" should be created in the
DB
ti2, ti3 = sorted(
(
session.execute(
select(TaskInstance).where(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == dr.run_id,
TaskInstance.task_id == task_id,
)
)
.scalars()
.one()
for task_id in ["2", "3"]
),
key=lambda ti: ti.task_id,
)
assert ti2.state == TaskInstanceState.NONE
assert ti3.state == TaskInstanceState.NONE
# DagRun should be updated to the new dag version and queued
assert dr.created_dag_version_id == new_dag_version.id
assert dr.state == DagRunState.QUEUED
```
##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
assert TaskInstanceState.REMOVED not in [ti.state for ti in
dr.task_instances]
for ti in dr.task_instances:
assert ti.dag_version_id == old_dag_version.id
+
+ def test_clear_only_new_tasks(self, dag_maker, session):
+ """Test that only_new queues only newly added tasks without clearing
existing ones."""
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ EmptyOperator(task_id="2")
+ EmptyOperator(task_id="3")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ count = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ session=session,
+ )
+
+ session.flush()
+ dr.refresh_from_db(session)
+
+ # Should return 2 new tasks
+ assert count == 2
+
+ def test_clear_only_new_tasks_dry_run(self, dag_maker, session):
+ """Test that only_new with dry_run returns new tasks without making
changes."""
+ with dag_maker(
+ "test_clear_new_task_instances_dry_run",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_new_task_instances_dry_run",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ EmptyOperator(task_id="2")
+ EmptyOperator(task_id="3")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ # Dry run - should return list of new tasks without making changes
+ new_tis = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ dry_run=True,
+ session=session,
+ )
+
+ session.flush()
+ dr.refresh_from_db(session)
+
+ # Check if correct tasks are returned
+ assert len(new_tis) == 2
+ assert [new_tis[0].task_id, new_tis[1].task_id] == ["2", "3"]
+ assert [new_tis[0].state, new_tis[1].state] == [None, None]
+
+ # Verify no changes were made to the database
+ assert dr.created_dag_version_id == old_dag_version.id
+ assert len(dr.task_instances) == 2 # should be only the 2 earlier
tasks
+
+ def test_clear_only_new_no_new_tasks(self, dag_maker, session):
+ """Test that only_new returns 0 when no new tasks are added."""
+ with dag_maker(
+ "test_clear_no_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_no_new_task_instances",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ) as dag:
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+
+ new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ assert old_dag_version.id != new_dag_version.id
+
+ # Clear with only_new should return 0
+ count = dag.clear(
+ run_id=dr.run_id,
+ only_new=True,
+ session=session,
+ )
+
+ assert count == 0
Review Comment:
The test coverage is missing a test case for when both `task_ids` and
`only_new=True` are provided to `dag.clear()`. This would help verify the
expected behavior (whether it should raise an error or whether one parameter
takes precedence).
```suggestion
assert count == 0
def test_clear_only_new_with_task_ids_no_new_tasks(self, dag_maker,
session):
"""Test that only_new with task_ids returns 0 when no new tasks are
added."""
with dag_maker(
"test_clear_no_new_task_instances",
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
catchup=True,
bundle_version="v1",
) as dag:
task0 = EmptyOperator(task_id="0")
task1 = EmptyOperator(task_id="1")
dr = dag_maker.create_dagrun(
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
old_dag_version = DagVersion.get_latest_version(dr.dag_id)
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(dag.get_task("0"))
ti1.refresh_from_task(dag.get_task("1"))
run_task_instance(ti0, task0)
run_task_instance(ti1, task1)
dr.state = DagRunState.SUCCESS
session.merge(dr)
session.flush()
with dag_maker(
"test_clear_no_new_task_instances",
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
catchup=True,
bundle_version="v2",
) as dag:
EmptyOperator(task_id="0")
EmptyOperator(task_id="1")
new_dag_version = DagVersion.get_latest_version(dag.dag_id)
assert old_dag_version.id != new_dag_version.id
# Clear with only_new and task_ids should return 0
count = dag.clear(
run_id=dr.run_id,
task_ids=["0", "1"],
only_new=True,
session=session,
)
assert count == 0
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
# These changes are committed by the calling function.
+def _get_new_tasks(
+ dag_id: str,
+ run_id: str,
+ session: Session,
+) -> Collection[str | tuple[str, int]]:
+ """
+ Get task instances for newly added tasks in the latest DAG version.
+
+ Updates the DAG run to the latest version if new tasks are found and
+ creates task instances for them via verify_integrity.
+
+ :param run_id: The run_id for the DAG run
+ :param dag_id: The dag_id for the DAG
+ :param session: SQLAlchemy session
+ :return: List of task instances for newly added tasks
+ """
+ from airflow.models.dagbag import DBDagBag
+ from airflow.models.dagrun import DagRun
+
+ dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=run_id))
+ if not dag_run:
+ raise ValueError(f"DagRun with run_id '{run_id}' not found")
+
+ scheduler_dagbag = DBDagBag(load_op_links=False)
+ latest_dag = scheduler_dagbag.get_latest_version_of_dag(dag_id,
session=session)
+
+ if not latest_dag:
+ raise ValueError(f"Latest DAG version for '{dag_id}' not found")
+
+ current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run,
session=session)
+ new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if
current_dag else set()
Review Comment:
When `current_dag` is `None` (line 260), the function returns an empty list
of new tasks even though there's a valid `latest_dag`. This edge case might
occur if the DagRun's version is no longer available. Consider logging a
warning message in this scenario to help users debug why no tasks are being
queued, or raising an exception if this is an unexpected state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]