ashb commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898615880


##########
task_sdk/src/airflow/sdk/api/datamodels/_generated.py:
##########
@@ -126,9 +126,12 @@ class TerminalTIState(str, Enum):
     """
 
     SUCCESS = "success"
-    FAILED = "failed"
+    FAILED = "failed"  # This state indicates that we attempt to retry.
     SKIPPED = "skipped"
     REMOVED = "removed"
+    FAIL_WITHOUT_RETRY = (
+        "fail_without_retry"  # This state is useful for when we want to 
terminate a task, without retrying.
+    )

Review Comment:
   Auto-generation will nuke these comments.
   
   ```suggestion
       FAILED = "failed"
       SKIPPED = "skipped"
       REMOVED = "removed"
       FAIL_WITHOUT_RETRY = "fail_without_retry"
   ```



##########
airflow/utils/state.py:
##########
@@ -36,9 +36,12 @@ class TerminalTIState(str, Enum):
     """States that a Task Instance can be in that indicate it has reached a 
terminal state."""
 
     SUCCESS = "success"
-    FAILED = "failed"
+    FAILED = "failed"  # This state indicates that we attempt to retry.

Review Comment:
   This enum is used in lots of other places, so we can't add this comment here 
really.
   ```suggestion
       FAILED = "failed"
   ```



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
         msg = TaskState(
-            state=TerminalTIState.FAILED,
+            state=TerminalTIState.FAIL_WITHOUT_RETRY,

Review Comment:
   I think this one should still be FAILED?



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -205,11 +209,17 @@ def ti_update_state(
 
     if isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
-        query = query.values(state=ti_patch_payload.state)
-        if ti_patch_payload.state == State.FAILED:
-            # clear the next_method and next_kwargs
-            query = query.values(next_method=None, next_kwargs=None)
+        updated_state = ti_patch_payload.state
+        # if we get failed, we should attempt to retry, as it is a more
+        # normal state. Tasks with retries are more frequent than without 
retries.
+        if ti_patch_payload.state == TerminalTIState.FAIL_WITHOUT_RETRY:

Review Comment:
   I think we are missing a test case in test_task_instance.py of sending 
FAIL_WITHOUT_RETRY



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
         msg = TaskState(
-            state=TerminalTIState.FAILED,
+            state=TerminalTIState.FAIL_WITHOUT_RETRY,

Review Comment:
   Now I think about it: I'm not sure this exception can actually ever be 
thrown, it looks like before that TI.heartbeat would notice the state and throw 
this exception "in" to the task? Not sure, lets look at that in a future PR



##########
task_sdk/tests/execution_time/test_task_runner.py:
##########
@@ -256,6 +256,84 @@ def test_run_basic_skipped(time_machine, mocked_parse, 
make_ti_context):
         )
 
 
+def test_run_raises_base_exception(time_machine, mocked_parse, 
make_ti_context):
+    """Test running a basic task that raises a base exception which should 
send fail_with_retry state."""
+    from airflow.providers.standard.operators.python import PythonOperator
+
+    task = PythonOperator(
+        task_id="zero_division_error",
+        python_callable=lambda: 1 / 0,
+    )
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(),
+            task_id="zero_division_error",
+            dag_id="basic_dag_base_exception",
+            run_id="c",
+            try_number=1,
+        ),
+        file="",
+        requests_fd=0,
+        ti_context=make_ti_context(),
+    )
+
+    ti = mocked_parse(what, "basic_dag_base_exception", task)
+
+    instant = timezone.datetime(2024, 12, 3, 10, 0)
+    time_machine.move_to(instant, tick=False)
+
+    with mock.patch(
+        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+    ) as mock_supervisor_comms:
+        run(ti, log=mock.MagicMock())
+
+        mock_supervisor_comms.send_request.assert_called_once_with(
+            msg=TaskState(
+                state=TerminalTIState.FAILED,
+                end_date=instant,
+            ),
+            log=mock.ANY,
+        )
+
+
+def test_startup_basic_templated_dag(mocked_parse, make_ti_context):
+    """Test running a DAG with templated task."""
+    from airflow.providers.standard.operators.bash import BashOperator
+
+    task = BashOperator(
+        task_id="templated_task",
+        bash_command="echo 'Logical date is {{ logical_date }}'",
+    )
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(), task_id="templated_task", 
dag_id="basic_templated_dag", run_id="c", try_number=1
+        ),
+        file="",
+        requests_fd=0,
+        ti_context=make_ti_context(),
+    )
+    mocked_parse(what, "basic_templated_dag", task)
+
+    with mock.patch(
+        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+    ) as mock_supervisor_comms:

Review Comment:
   Will conflict with #45245 -- heads up @kaxil @amoghrajesh 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to