This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new a0a516bdff0 [v3-1-test] Fix task retry logic to respect retries for
all exit codes (#58384) (#58478)
a0a516bdff0 is described below
commit a0a516bdff0b3cc7d62e87ddb58197c522d67347
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Nov 19 19:07:15 2025 +0530
[v3-1-test] Fix task retry logic to respect retries for all exit codes
(#58384) (#58478)
(cherry picked from commit 93c7348)
---
.../src/airflow/sdk/execution_time/supervisor.py | 10 ++++++----
.../task_sdk/execution_time/test_supervisor.py | 21 ++++++++++++++++++---
2 files changed, 24 insertions(+), 7 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b3c93ba9654..176366fbda1 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1164,10 +1164,12 @@ class ActivitySubprocess(WatchedSubprocess):
if self._exit_code != 0 and self._terminal_state == SERVER_TERMINATED:
return SERVER_TERMINATED
- # Any negative exit code indicates a signal kill
- # We consider all signal kills as potentially retryable
- # since they're often transient issues that could succeed on retry
- if self._exit_code < 0 and self._should_retry:
+ # Any non zero exit code indicates a failure
+ # If retries are configured, mark as UP_FOR_RETRY
+ # Negative exit codes indicate signal kills (often transient)
+ # Positive exit codes can also be transient failures like network
issues in a task communicating to
+ # external services
+ if self._exit_code != 0 and self._should_retry:
return TaskInstanceState.UP_FOR_RETRY
return TaskInstanceState.FAILED
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 141bb1967e9..ba3c5166931 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2433,7 +2433,7 @@ def test_remote_logging_conn(remote_logging, remote_conn,
expected_env, monkeypa
class TestSignalRetryLogic:
- """Test signal based retry logic in ActivitySubprocess."""
+ """Test retry logic for exit codes (signals and non-signal failures) in
ActivitySubprocess."""
@pytest.mark.parametrize(
"signal",
@@ -2486,8 +2486,8 @@ class TestSignalRetryLogic:
result = mock_watched_subprocess.final_state
assert result == TaskInstanceState.FAILED
- def test_non_signal_exit_code_goes_to_failed(self, mocker):
- """Test that non signal exit codes go to failed regardless of task
retries."""
+ def test_non_signal_exit_code_with_retry_goes_to_up_for_retry(self,
mocker):
+ """Test that non-signal exit codes with retries enabled go to
UP_FOR_RETRY."""
mock_watched_subprocess = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
@@ -2499,6 +2499,21 @@ class TestSignalRetryLogic:
mock_watched_subprocess._exit_code = 1
mock_watched_subprocess._should_retry = True
+ assert mock_watched_subprocess.final_state ==
TaskInstanceState.UP_FOR_RETRY
+
+ def test_non_signal_exit_code_without_retry_goes_to_failed(self, mocker):
+ """Test that non-signal exit codes without retries enabled go to
FAILED."""
+ mock_watched_subprocess = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=12345,
+ stdin=mocker.Mock(),
+ process=mocker.Mock(),
+ client=mocker.Mock(),
+ )
+ mock_watched_subprocess._exit_code = 1
+ mock_watched_subprocess._should_retry = False
+
assert mock_watched_subprocess.final_state == TaskInstanceState.FAILED