amoghrajesh commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1899335595


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
         msg = TaskState(
-            state=TerminalTIState.FAILED,
+            state=TerminalTIState.FAIL_WITHOUT_RETRY,

Review Comment:
   Alright, will investigate in a follow up. For what is worth, it should never 
be thrown actually. This is just a safety hatch in case it is



##########
task_sdk/tests/execution_time/test_task_runner.py:
##########
@@ -256,6 +256,84 @@ def test_run_basic_skipped(time_machine, mocked_parse, 
make_ti_context):
         )
 
 
+def test_run_raises_base_exception(time_machine, mocked_parse, 
make_ti_context):
+    """Test running a basic task that raises a base exception which should 
send fail_with_retry state."""
+    from airflow.providers.standard.operators.python import PythonOperator
+
+    task = PythonOperator(
+        task_id="zero_division_error",
+        python_callable=lambda: 1 / 0,
+    )
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(),
+            task_id="zero_division_error",
+            dag_id="basic_dag_base_exception",
+            run_id="c",
+            try_number=1,
+        ),
+        file="",
+        requests_fd=0,
+        ti_context=make_ti_context(),
+    )
+
+    ti = mocked_parse(what, "basic_dag_base_exception", task)
+
+    instant = timezone.datetime(2024, 12, 3, 10, 0)
+    time_machine.move_to(instant, tick=False)
+
+    with mock.patch(
+        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+    ) as mock_supervisor_comms:
+        run(ti, log=mock.MagicMock())
+
+        mock_supervisor_comms.send_request.assert_called_once_with(
+            msg=TaskState(
+                state=TerminalTIState.FAILED,
+                end_date=instant,
+            ),
+            log=mock.ANY,
+        )
+
+
+def test_startup_basic_templated_dag(mocked_parse, make_ti_context):
+    """Test running a DAG with templated task."""
+    from airflow.providers.standard.operators.bash import BashOperator
+
+    task = BashOperator(
+        task_id="templated_task",
+        bash_command="echo 'Logical date is {{ logical_date }}'",
+    )
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(), task_id="templated_task", 
dag_id="basic_templated_dag", run_id="c", try_number=1
+        ),
+        file="",
+        requests_fd=0,
+        ti_context=make_ti_context(),
+    )
+    mocked_parse(what, "basic_templated_dag", task)
+
+    with mock.patch(
+        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+    ) as mock_supervisor_comms:

Review Comment:
   Thanks, resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to