amoghrajesh commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1899335595
##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# updated already be another UI API. So, these exceptions should
ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
msg = TaskState(
- state=TerminalTIState.FAILED,
+ state=TerminalTIState.FAIL_WITHOUT_RETRY,
Review Comment:
Alright, will investigate in a follow up. For what is worth, it should never
be thrown actually. This is just a safety hatch in case it is
##########
task_sdk/tests/execution_time/test_task_runner.py:
##########
@@ -256,6 +256,84 @@ def test_run_basic_skipped(time_machine, mocked_parse,
make_ti_context):
)
+def test_run_raises_base_exception(time_machine, mocked_parse,
make_ti_context):
+ """Test running a basic task that raises a base exception which should
send fail_with_retry state."""
+ from airflow.providers.standard.operators.python import PythonOperator
+
+ task = PythonOperator(
+ task_id="zero_division_error",
+ python_callable=lambda: 1 / 0,
+ )
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="zero_division_error",
+ dag_id="basic_dag_base_exception",
+ run_id="c",
+ try_number=1,
+ ),
+ file="",
+ requests_fd=0,
+ ti_context=make_ti_context(),
+ )
+
+ ti = mocked_parse(what, "basic_dag_base_exception", task)
+
+ instant = timezone.datetime(2024, 12, 3, 10, 0)
+ time_machine.move_to(instant, tick=False)
+
+ with mock.patch(
+ "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+ ) as mock_supervisor_comms:
+ run(ti, log=mock.MagicMock())
+
+ mock_supervisor_comms.send_request.assert_called_once_with(
+ msg=TaskState(
+ state=TerminalTIState.FAILED,
+ end_date=instant,
+ ),
+ log=mock.ANY,
+ )
+
+
+def test_startup_basic_templated_dag(mocked_parse, make_ti_context):
+ """Test running a DAG with templated task."""
+ from airflow.providers.standard.operators.bash import BashOperator
+
+ task = BashOperator(
+ task_id="templated_task",
+ bash_command="echo 'Logical date is {{ logical_date }}'",
+ )
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(), task_id="templated_task",
dag_id="basic_templated_dag", run_id="c", try_number=1
+ ),
+ file="",
+ requests_fd=0,
+ ti_context=make_ti_context(),
+ )
+ mocked_parse(what, "basic_templated_dag", task)
+
+ with mock.patch(
+ "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+ ) as mock_supervisor_comms:
Review Comment:
Thanks, resolved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]