Copilot commented on code in PR #59764:
URL: https://github.com/apache/airflow/pull/59764#discussion_r2820442921


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG
+    :param session: SQLAlchemy session
+    :return: List of task instances for newly added tasks

Review Comment:
   The docstring at line 244 says it returns "List of task instances for newly 
added tasks", but the function actually returns a list of task IDs (strings), 
not TaskInstance objects. The return type annotation correctly shows 
`Collection[str | tuple[str, int]]`, but the docstring description should be 
updated to say "List of task IDs for newly added tasks" or "Collection of task 
IDs or (task_id, map_index) tuples for newly added tasks".



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1008,7 +1014,12 @@ def clear(
             tuples that should not be cleared
         :param exclude_run_ids: A set of ``run_id`` or (``run_id``)
         """
-        from airflow.models.taskinstance import clear_task_instances
+        from airflow.models.taskinstance import _get_new_tasks, 
clear_task_instances
+
+        if only_new:

Review Comment:
   When `only_new=True`, the code unconditionally overwrites the `task_ids` 
parameter on line 1022. This means if a user provides both `task_ids` and 
`only_new=True`, the provided `task_ids` will be silently ignored. This could 
lead to unexpected behavior.
   
   Consider adding validation to make `only_new` and `task_ids` mutually 
exclusive (raising a `ValueError` if both are provided), or document this 
behavior clearly. The validation should check `if only_new and task_ids is not 
None:`.
   ```suggestion
           if only_new:
               if task_ids is not None:
                   raise ValueError("only_new and task_ids are mutually 
exclusive")
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG

Review Comment:
   The docstring parameter order (lines 241-243) doesn't match the function 
signature parameter order (lines 231-233). The function signature has `dag_id, 
run_id, session` but the docstring lists them as `run_id, dag_id, session`. The 
docstring should match the signature order for consistency.
   ```suggestion
       :param dag_id: The dag_id for the DAG
       :param run_id: The run_id for the DAG run
   ```



##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def 
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
             assert TaskInstanceState.REMOVED not in [ti.state for ti in 
dr.task_instances]
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
+
+    def test_clear_only_new_tasks(self, dag_maker, session):
+        """Test that only_new queues only newly added tasks without clearing 
existing ones."""
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Should return 2 new tasks
+        assert count == 2
+
+    def test_clear_only_new_tasks_dry_run(self, dag_maker, session):
+        """Test that only_new with dry_run returns new tasks without making 
changes."""
+        with dag_maker(
+            "test_clear_new_task_instances_dry_run",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances_dry_run",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        # Dry run - should return list of new tasks without making changes
+        new_tis = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            dry_run=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Check if correct tasks are returned
+        assert len(new_tis) == 2
+        assert [new_tis[0].task_id, new_tis[1].task_id] == ["2", "3"]
+        assert [new_tis[0].state, new_tis[1].state] == [None, None]
+
+        # Verify no changes were made to the database
+        assert dr.created_dag_version_id == old_dag_version.id
+        assert len(dr.task_instances) == 2  # should be only the 2 earlier 
tasks
+
+    def test_clear_only_new_no_new_tasks(self, dag_maker, session):
+        """Test that only_new returns 0 when no new tasks are added."""
+        with dag_maker(
+            "test_clear_no_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_no_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        # Clear with only_new should return 0
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        assert count == 0

Review Comment:
   The test coverage is missing a test case to verify that providing 
`only_new=True` without `run_id` raises a `ValueError` with the message 
"only_new requires run_id to be specified". This validation is implemented in 
the code but not tested.
   ```suggestion
           assert count == 0
   
       def test_clear_only_new_without_run_id_raises(self, dag_maker, session):
           """Test that only_new=True without run_id raises a ValueError."""
           with dag_maker(
               "test_clear_only_new_without_run_id",
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + datetime.timedelta(days=1),
               catchup=True,
               bundle_version="v1",
           ) as dag:
               EmptyOperator(task_id="task")
   
           dag_maker.create_dagrun(
               state=State.RUNNING,
               run_type=DagRunType.SCHEDULED,
           )
   
           with pytest.raises(ValueError, match="only_new requires run_id to be 
specified"):
               dag.clear(
                   only_new=True,
                   session=session,
               )
   ```



##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def 
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
             assert TaskInstanceState.REMOVED not in [ti.state for ti in 
dr.task_instances]
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
+
+    def test_clear_only_new_tasks(self, dag_maker, session):
+        """Test that only_new queues only newly added tasks without clearing 
existing ones."""
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Should return 2 new tasks
+        assert count == 2
+

Review Comment:
   The test verifies that 2 new tasks are returned, but it doesn't verify 
important aspects of the functionality:
   1. That the existing task instances (tasks "0" and "1") remain in their 
SUCCESS state and are not cleared
   2. That the new task instances (tasks "2" and "3") are actually created in 
the database with the correct state
   3. That the dag_run's `created_dag_version_id` is updated to the new version
   4. That the dag_run's state is set to QUEUED (the default for dag_run_state 
parameter)
   
   These assertions would provide more comprehensive coverage of the only_new 
functionality.
   ```suggestion
   
           # Existing task instances should remain in SUCCESS state and not be 
cleared
           ti0_after, ti1_after = sorted(
               (
                   session.execute(
                       select(TaskInstance).where(
                           TaskInstance.dag_id == dag.dag_id,
                           TaskInstance.run_id == dr.run_id,
                           TaskInstance.task_id == task_id,
                       )
                   )
                   .scalars()
                   .one()
                   for task_id in ["0", "1"]
               ),
               key=lambda ti: ti.task_id,
           )
           assert ti0_after.state == TaskInstanceState.SUCCESS
           assert ti1_after.state == TaskInstanceState.SUCCESS
   
           # New task instances for tasks "2" and "3" should be created in the 
DB
           ti2, ti3 = sorted(
               (
                   session.execute(
                       select(TaskInstance).where(
                           TaskInstance.dag_id == dag.dag_id,
                           TaskInstance.run_id == dr.run_id,
                           TaskInstance.task_id == task_id,
                       )
                   )
                   .scalars()
                   .one()
                   for task_id in ["2", "3"]
               ),
               key=lambda ti: ti.task_id,
           )
           assert ti2.state == TaskInstanceState.NONE
           assert ti3.state == TaskInstanceState.NONE
   
           # DagRun should be updated to the new dag version and queued
           assert dr.created_dag_version_id == new_dag_version.id
           assert dr.state == DagRunState.QUEUED
   ```



##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def 
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
             assert TaskInstanceState.REMOVED not in [ti.state for ti in 
dr.task_instances]
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
+
+    def test_clear_only_new_tasks(self, dag_maker, session):
+        """Test that only_new queues only newly added tasks without clearing 
existing ones."""
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Should return 2 new tasks
+        assert count == 2
+
+    def test_clear_only_new_tasks_dry_run(self, dag_maker, session):
+        """Test that only_new with dry_run returns new tasks without making 
changes."""
+        with dag_maker(
+            "test_clear_new_task_instances_dry_run",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances_dry_run",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        # Dry run - should return list of new tasks without making changes
+        new_tis = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            dry_run=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Check if correct tasks are returned
+        assert len(new_tis) == 2
+        assert [new_tis[0].task_id, new_tis[1].task_id] == ["2", "3"]
+        assert [new_tis[0].state, new_tis[1].state] == [None, None]
+
+        # Verify no changes were made to the database
+        assert dr.created_dag_version_id == old_dag_version.id
+        assert len(dr.task_instances) == 2  # should be only the 2 earlier 
tasks
+
+    def test_clear_only_new_no_new_tasks(self, dag_maker, session):
+        """Test that only_new returns 0 when no new tasks are added."""
+        with dag_maker(
+            "test_clear_no_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_no_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        # Clear with only_new should return 0
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        assert count == 0

Review Comment:
   The test coverage is missing a test case for when both `task_ids` and 
`only_new=True` are provided to `dag.clear()`. This would help verify the 
expected behavior (whether it should raise an error or whether one parameter 
takes precedence).
   ```suggestion
           assert count == 0
   
       def test_clear_only_new_with_task_ids_no_new_tasks(self, dag_maker, 
session):
           """Test that only_new with task_ids returns 0 when no new tasks are 
added."""
           with dag_maker(
               "test_clear_no_new_task_instances",
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + datetime.timedelta(days=10),
               catchup=True,
               bundle_version="v1",
           ) as dag:
               task0 = EmptyOperator(task_id="0")
               task1 = EmptyOperator(task_id="1")
           dr = dag_maker.create_dagrun(
               state=State.RUNNING,
               run_type=DagRunType.SCHEDULED,
           )
   
           old_dag_version = DagVersion.get_latest_version(dr.dag_id)
           ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
           ti0.refresh_from_task(dag.get_task("0"))
           ti1.refresh_from_task(dag.get_task("1"))
   
           run_task_instance(ti0, task0)
           run_task_instance(ti1, task1)
           dr.state = DagRunState.SUCCESS
           session.merge(dr)
           session.flush()
   
           with dag_maker(
               "test_clear_no_new_task_instances",
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + datetime.timedelta(days=10),
               catchup=True,
               bundle_version="v2",
           ) as dag:
               EmptyOperator(task_id="0")
               EmptyOperator(task_id="1")
   
           new_dag_version = DagVersion.get_latest_version(dag.dag_id)
   
           assert old_dag_version.id != new_dag_version.id
   
           # Clear with only_new and task_ids should return 0
           count = dag.clear(
               run_id=dr.run_id,
               task_ids=["0", "1"],
               only_new=True,
               session=session,
           )
   
           assert count == 0
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG
+    :param session: SQLAlchemy session
+    :return: List of task instances for newly added tasks
+    """
+    from airflow.models.dagbag import DBDagBag
+    from airflow.models.dagrun import DagRun
+
+    dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, 
run_id=run_id))
+    if not dag_run:
+        raise ValueError(f"DagRun with run_id '{run_id}' not found")
+
+    scheduler_dagbag = DBDagBag(load_op_links=False)
+    latest_dag = scheduler_dagbag.get_latest_version_of_dag(dag_id, 
session=session)
+
+    if not latest_dag:
+        raise ValueError(f"Latest DAG version for '{dag_id}' not found")
+
+    current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run, 
session=session)
+    new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if 
current_dag else set()

Review Comment:
   When `current_dag` is `None` (line 260), the function returns an empty list 
of new tasks even though there's a valid `latest_dag`. This edge case might 
occur if the DagRun's version is no longer available. Consider logging a 
warning message in this scenario to help users debug why no tasks are being 
queued, or raising an exception if this is an unexpected state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to