jedcunningham commented on code in PR #23079:
URL: https://github.com/apache/airflow/pull/23079#discussion_r1281294660


##########
tests/models/test_dag.py:
##########
@@ -2381,6 +2382,57 @@ def consumer(value):
     ]
 
 
[email protected]("run_id, execution_date", [(None, datetime_tz(2020, 
1, 1)), ('test-run-id', None)])
+def test_set_task_instance_state_failed_downstream(run_id, execution_date, 
session, dag_maker):
+    """Test that set_task_instance_state updates the TaskInstance state to 
failed but doesn't clear downstream
+    tasks since the state to be set is already failed."""
+
+    start_date = datetime_tz(2020, 1, 1)
+    with dag_maker("test_set_task_instance_state", start_date=start_date, 
session=session) as dag:
+        task_1 = DummyOperator(task_id="task_1")
+        task_2 = DummyOperator(task_id="task_2")
+        task_3 = DummyOperator(task_id="task_3")
+
+        task_1 >> task_2 >> task_3
+
+    dagrun = dag_maker.create_dagrun(
+        run_id=run_id,
+        execution_date=execution_date,
+        state=State.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+    )
+
+    def get_ti_from_db(task):
+        return (
+            session.query(TI)
+            .filter(
+                TI.dag_id == dag.dag_id,
+                TI.task_id == task.task_id,
+                TI.run_id == dagrun.run_id,
+            )
+            .one()
+        )
+
+    get_ti_from_db(task_1).state = State.SUCCESS
+    get_ti_from_db(task_2).state = State.SUCCESS
+    get_ti_from_db(task_3).state = State.SUCCESS
+
+    session.flush()
+
+    dag.set_task_instance_state(
+        task_id=task_1.task_id,
+        run_id=run_id,
+        execution_date=execution_date,
+        state=State.FAILED,
+        session=session,
+        downstream=True,
+    )
+
+    assert get_ti_from_db(task_1).state == State.FAILED
+    assert get_ti_from_db(task_2).state == State.FAILED
+    assert get_ti_from_db(task_3).state == State.FAILED

Review Comment:
   My 2c is it needs to be consistent. If we like the "success + downstream" 
clearing downstream behavior, it should do the same thing with "failed + 
downstream".
   
   So I think the current behavior is correct, but I think the dialog could be 
refactored to indicate that the downstreams are cleared.
   
   And here is a reason you'd want clearing instead of just setting everything 
to failed. What if you have tasks with `one_failed` or `all_failed` as a 
trigger rule downstream? Forcing everything downstream as failed assumes 
`all_success`...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to