amoghrajesh commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898536154


##########
task_sdk/tests/execution_time/test_task_runner.py:
##########
@@ -256,6 +256,84 @@ def test_run_basic_skipped(time_machine, mocked_parse, 
make_ti_context):
         )
 
 
+def test_run_raises_base_exception(time_machine, mocked_parse, 
make_ti_context):
+    """Test running a basic task that raises a base exception which should 
send fail_with_retry state."""
+    from airflow.providers.standard.operators.python import PythonOperator
+
+    task = PythonOperator(
+        task_id="zero_division_error",
+        python_callable=lambda: 1 / 0,
+    )
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(),
+            task_id="zero_division_error",
+            dag_id="basic_dag_base_exception",
+            run_id="c",
+            try_number=1,
+        ),
+        file="",
+        requests_fd=0,
+        ti_context=make_ti_context(),
+    )
+
+    ti = mocked_parse(what, "basic_dag_base_exception", task)
+
+    instant = timezone.datetime(2024, 12, 3, 10, 0)
+    time_machine.move_to(instant, tick=False)
+
+    with mock.patch(
+        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+    ) as mock_supervisor_comms:
+        run(ti, log=mock.MagicMock())
+
+        mock_supervisor_comms.send_request.assert_called_once_with(
+            msg=TaskState(
+                state=TerminalTIState.FAIL_WITH_RETRY,
+                end_date=instant,
+            ),
+            log=mock.ANY,
+        )
+
+
+def test_startup_basic_templated_dag(mocked_parse, make_ti_context):

Review Comment:
   Diff probably cos I moved this test down



##########
tests/api_fastapi/execution_api/routes/test_task_instances.py:
##########
@@ -340,6 +340,43 @@ def test_ti_update_state_to_reschedule(self, client, 
session, create_task_instan
         assert trs[0].map_index == -1
         assert trs[0].duration == 129600
 
+    @pytest.mark.parametrize(
+        ("retries", "expected_state"),
+        [
+            (0, State.FAILED),
+            (None, State.FAILED),

Review Comment:
   These should "fail", first one has retries as 0, second one doesn't even 
have the parameter configured.



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -205,11 +209,16 @@ def ti_update_state(
 
     if isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
-        query = query.values(state=ti_patch_payload.state)
-        if ti_patch_payload.state == State.FAILED:
-            # clear the next_method and next_kwargs
-            query = query.values(next_method=None, next_kwargs=None)
-            updated_state = State.FAILED
+        updated_state = (
+            State.FAILED
+            if ti_patch_payload.state == TerminalTIState.FAIL_WITH_RETRY
+            else ti_patch_payload.state
+        )
+        query = query.values(next_method=None, next_kwargs=None, 
state=updated_state)

Review Comment:
   We can remove the `next_method` and `next_kwargs` for all the terminal 
states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to