o-nikolas commented on code in PR #39046:
URL: https://github.com/apache/airflow/pull/39046#discussion_r1568018949


##########
tests/models/test_cleartasks.py:
##########
@@ -113,6 +113,99 @@ def test_clear_task_instances_external_executor_id(self, 
dag_maker):
             assert ti0.state is None
             assert ti0.external_executor_id is None
 
+    @pytest.mark.parametrize(
+        "state, new_executor",
+        [
+            (State.QUEUED, "bar_executor"),
+            (State.SCHEDULED, "bar_executor"),
+            (State.SUCCESS, "bar_executor"),
+            (State.NONE, "bar_executor"),
+            (State.RESTARTING, "bar_executor"),
+            (State.RUNNING, "bar_executor"),
+        ],
+    )
+    def test_clear_task_instances_executor_modified(self, dag_maker, state, 
new_executor):
+        """Test that the executor field is refreshed from the task if a task 
instance is refreshed"""
+        original_executor = "foo_executor"
+        with dag_maker(
+            "test_clear_task_instances_executor",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        ) as dag:
+            EmptyOperator(task_id="task0", executor=original_executor)
+
+        ti0 = dag_maker.create_dagrun().task_instances[0]
+        ti0.state = state
+
+        with create_session() as session:
+            session.add(ti0)
+            session.commit()
+
+            # Update the dag after the task has been scheduled, to update the 
executor field.
+            # After the clear task below, the TI should now contain the latest 
executor name
+            with dag_maker(
+                "test_clear_task_instances_executor",
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            ) as _:
+                EmptyOperator(task_id="task0", executor=new_executor)
+            # we use order_by(task_id) here because for the test DAG structure 
of ours
+            # this is equivalent to topological sort. It would not work in 
general case
+            # but it works for our case because we specifically constructed 
test DAGS
+            # in the way that those two sort methods are equivalent
+            qry = session.query(TI).filter(TI.dag_id == 
dag.dag_id).order_by(TI.task_id).all()
+            clear_task_instances(qry, session)
+
+            if state == State.RUNNING:
+                # TI state is not cleared if it is running, it is instead set 
to RESTARTING so that it will
+                # be retried.

Review Comment:
   > Just to be clear, if the executor is changed for a Task while it is 
running, the task will be retried with the new executor?
   
   Nope! Tasks "cleared" when they are running are not actually cleared, 
they're just set to be retried and we don't modify the executor during retries. 
This is an existing mechanism, I didn't set this.
   
   A user would mark the running task as failed (or success I suppose) if they 
wanted, then they could properly clear it (which would then refresh the 
executor)
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to