ashb commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898615880
##########
task_sdk/src/airflow/sdk/api/datamodels/_generated.py:
##########
@@ -126,9 +126,12 @@ class TerminalTIState(str, Enum):
"""
SUCCESS = "success"
- FAILED = "failed"
+ FAILED = "failed" # This state indicates that we attempt to retry.
SKIPPED = "skipped"
REMOVED = "removed"
+ FAIL_WITHOUT_RETRY = (
+ "fail_without_retry" # This state is useful for when we want to
terminate a task, without retrying.
+ )
Review Comment:
Auto-generation will nuke these comments.
```suggestion
FAILED = "failed"
SKIPPED = "skipped"
REMOVED = "removed"
FAIL_WITHOUT_RETRY = "fail_without_retry"
```
##########
airflow/utils/state.py:
##########
@@ -36,9 +36,12 @@ class TerminalTIState(str, Enum):
"""States that a Task Instance can be in that indicate it has reached a
terminal state."""
SUCCESS = "success"
- FAILED = "failed"
+ FAILED = "failed" # This state indicates that we attempt to retry.
Review Comment:
This enum is used in lots of other places, so we can't add this comment here
really.
```suggestion
FAILED = "failed"
```
##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# updated already be another UI API. So, these exceptions should
ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
msg = TaskState(
- state=TerminalTIState.FAILED,
+ state=TerminalTIState.FAIL_WITHOUT_RETRY,
Review Comment:
I think this one should still be FAILED?
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -205,11 +209,17 @@ def ti_update_state(
if isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
- query = query.values(state=ti_patch_payload.state)
- if ti_patch_payload.state == State.FAILED:
- # clear the next_method and next_kwargs
- query = query.values(next_method=None, next_kwargs=None)
+ updated_state = ti_patch_payload.state
+ # if we get failed, we should attempt to retry, as it is a more
+ # normal state. Tasks with retries are more frequent than without
retries.
+ if ti_patch_payload.state == TerminalTIState.FAIL_WITHOUT_RETRY:
Review Comment:
I think we are missing a test case in test_task_instance.py of sending
FAIL_WITHOUT_RETRY
##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -422,16 +422,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# updated already be another UI API. So, these exceptions should
ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
msg = TaskState(
- state=TerminalTIState.FAILED,
+ state=TerminalTIState.FAIL_WITHOUT_RETRY,
Review Comment:
Now I think about it: I'm not sure this exception can actually ever be
thrown, it looks like before that TI.heartbeat would notice the state and throw
this exception "in" to the task? Not sure, lets look at that in a future PR
##########
task_sdk/tests/execution_time/test_task_runner.py:
##########
@@ -256,6 +256,84 @@ def test_run_basic_skipped(time_machine, mocked_parse,
make_ti_context):
)
+def test_run_raises_base_exception(time_machine, mocked_parse,
make_ti_context):
+ """Test running a basic task that raises a base exception which should
send fail_with_retry state."""
+ from airflow.providers.standard.operators.python import PythonOperator
+
+ task = PythonOperator(
+ task_id="zero_division_error",
+ python_callable=lambda: 1 / 0,
+ )
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="zero_division_error",
+ dag_id="basic_dag_base_exception",
+ run_id="c",
+ try_number=1,
+ ),
+ file="",
+ requests_fd=0,
+ ti_context=make_ti_context(),
+ )
+
+ ti = mocked_parse(what, "basic_dag_base_exception", task)
+
+ instant = timezone.datetime(2024, 12, 3, 10, 0)
+ time_machine.move_to(instant, tick=False)
+
+ with mock.patch(
+ "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+ ) as mock_supervisor_comms:
+ run(ti, log=mock.MagicMock())
+
+ mock_supervisor_comms.send_request.assert_called_once_with(
+ msg=TaskState(
+ state=TerminalTIState.FAILED,
+ end_date=instant,
+ ),
+ log=mock.ANY,
+ )
+
+
+def test_startup_basic_templated_dag(mocked_parse, make_ti_context):
+ """Test running a DAG with templated task."""
+ from airflow.providers.standard.operators.bash import BashOperator
+
+ task = BashOperator(
+ task_id="templated_task",
+ bash_command="echo 'Logical date is {{ logical_date }}'",
+ )
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(), task_id="templated_task",
dag_id="basic_templated_dag", run_id="c", try_number=1
+ ),
+ file="",
+ requests_fd=0,
+ ti_context=make_ti_context(),
+ )
+ mocked_parse(what, "basic_templated_dag", task)
+
+ with mock.patch(
+ "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+ ) as mock_supervisor_comms:
Review Comment:
Will conflict with #45245 -- heads up @kaxil @amoghrajesh
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]