jedcunningham commented on code in PR #59764:
URL: https://github.com/apache/airflow/pull/59764#discussion_r2820417101


##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -949,6 +951,7 @@ def clear(
         end_date: datetime.datetime | None = None,
         only_failed: bool = False,
         only_running: bool = False,
+        only_new: bool = False,

Review Comment:
   ```suggestion
   ```
   
   If we are requiring a run_id, we shouldnt add it to this overload right?



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG

Review Comment:
   ```suggestion
       :param dag_id: The dag_id for the DAG
       :param run_id: The run_id for the DAG run
   ```
   
   order is wrong



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.

Review Comment:
   ```suggestion
       Get task ids for newly added tasks in the latest DAG version.
   ```
   
   nit



##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def 
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
             assert TaskInstanceState.REMOVED not in [ti.state for ti in 
dr.task_instances]
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
+
+    def test_clear_only_new_tasks(self, dag_maker, session):
+        """Test that only_new queues only newly added tasks without clearing 
existing ones."""
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,

Review Comment:
   ```suggestion
   ```
   
   We don't need these, right?



##########
airflow-core/tests/unit/models/test_cleartasks.py:
##########
@@ -738,3 +738,172 @@ def 
test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
             assert TaskInstanceState.REMOVED not in [ti.state for ti in 
dr.task_instances]
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
+
+    def test_clear_only_new_tasks(self, dag_maker, session):
+        """Test that only_new queues only newly added tasks without clearing 
existing ones."""
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_new_task_instances",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ) as dag:
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+            EmptyOperator(task_id="2")
+            EmptyOperator(task_id="3")
+
+        new_dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        assert old_dag_version.id != new_dag_version.id
+
+        count = dag.clear(
+            run_id=dr.run_id,
+            only_new=True,
+            session=session,
+        )
+
+        session.flush()
+        dr.refresh_from_db(session)
+
+        # Should return 2 new tasks
+        assert count == 2

Review Comment:
   ```suggestion
           assert count == 2
   
           session.flush()
           dr.refresh_from_db(session)
           assert dr.created_dag_version_id == new_dag_version.id
   ```
   
   This order is a bit funky. Lets assert the count right away. Also, comment 
doesn't add value.
   
   Were you planning to assert something else? Otherwise, why flush/refresh the 
DR? Likely wanted to check the dag version on the run (change here untested)?
   
   Should also check that TIs for the new tasks are there?



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG
+    :param session: SQLAlchemy session
+    :return: List of task instances for newly added tasks
+    """
+    from airflow.models.dagbag import DBDagBag
+    from airflow.models.dagrun import DagRun
+
+    dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, 
run_id=run_id))
+    if not dag_run:
+        raise ValueError(f"DagRun with run_id '{run_id}' not found")
+
+    scheduler_dagbag = DBDagBag(load_op_links=False)
+    latest_dag = scheduler_dagbag.get_latest_version_of_dag(dag_id, 
session=session)
+
+    if not latest_dag:
+        raise ValueError(f"Latest DAG version for '{dag_id}' not found")
+
+    current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run, 
session=session)
+    new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if 
current_dag else set()
+
+    if new_task_ids:
+        dag_version = DagVersion.get_latest_version(dag_id, session=session)
+        if dag_version:
+            dag_run.created_dag_version_id = dag_version.id

Review Comment:
   This is mutating the dag version etc even in dry run mode.
   
   It might be better to split the getting and mutating into separate functions.
   
   I know you added a test, but I wonder if there is something funky in the 
test hiding the behavior. Something is weird here. Either way, we shouldn't do 
this in the first place for dry run mode.



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:
+    """
+    Get task instances for newly added tasks in the latest DAG version.
+
+    Updates the DAG run to the latest version if new tasks are found and
+    creates task instances for them via verify_integrity.
+
+    :param run_id: The run_id for the DAG run
+    :param dag_id: The dag_id for the DAG
+    :param session: SQLAlchemy session
+    :return: List of task instances for newly added tasks
+    """
+    from airflow.models.dagbag import DBDagBag
+    from airflow.models.dagrun import DagRun
+
+    dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, 
run_id=run_id))
+    if not dag_run:
+        raise ValueError(f"DagRun with run_id '{run_id}' not found")
+
+    scheduler_dagbag = DBDagBag(load_op_links=False)
+    latest_dag = scheduler_dagbag.get_latest_version_of_dag(dag_id, 
session=session)
+
+    if not latest_dag:
+        raise ValueError(f"Latest DAG version for '{dag_id}' not found")
+
+    current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run, 
session=session)
+    new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if 
current_dag else set()
+
+    if new_task_ids:
+        dag_version = DagVersion.get_latest_version(dag_id, session=session)
+        if dag_version:

Review Comment:
   You have to also update bundle_version. Otherwise, with versioned bundles, 
its likely you wont even have the new tasks once you get to a worker.



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -227,6 +227,49 @@ def _recalculate_dagrun_queued_at_deadlines(
     # These changes are committed by the calling function.
 
 
+def _get_new_tasks(
+    dag_id: str,
+    run_id: str,
+    session: Session,
+) -> Collection[str | tuple[str, int]]:

Review Comment:
   ```suggestion
   ) -> list[str]:
   ```



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -965,6 +968,7 @@ def clear(
         end_date: datetime.datetime | None = None,
         only_failed: bool = False,
         only_running: bool = False,
+        only_new: bool = False,

Review Comment:
   ```suggestion
   ```
   
   Or this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to