This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cda6f6cf2e2bc063be6a0ef4afdd876f97af71de Author: Amogh Desai <[email protected]> AuthorDate: Wed Nov 19 19:07:15 2025 +0530 [v3-1-test] Fix task retry logic to respect retries for all exit codes (#58384) (#58478) (cherry picked from commit 93c7348) --- .../src/airflow/sdk/execution_time/supervisor.py | 10 ++++++---- .../task_sdk/execution_time/test_supervisor.py | 21 ++++++++++++++++++--- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index b3c93ba9654..176366fbda1 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1164,10 +1164,12 @@ class ActivitySubprocess(WatchedSubprocess): if self._exit_code != 0 and self._terminal_state == SERVER_TERMINATED: return SERVER_TERMINATED - # Any negative exit code indicates a signal kill - # We consider all signal kills as potentially retryable - # since they're often transient issues that could succeed on retry - if self._exit_code < 0 and self._should_retry: + # Any non zero exit code indicates a failure + # If retries are configured, mark as UP_FOR_RETRY + # Negative exit codes indicate signal kills (often transient) + # Positive exit codes can also be transient failures like network issues in a task communicating to + # external services + if self._exit_code != 0 and self._should_retry: return TaskInstanceState.UP_FOR_RETRY return TaskInstanceState.FAILED diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 141bb1967e9..ba3c5166931 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2433,7 +2433,7 @@ def test_remote_logging_conn(remote_logging, remote_conn, expected_env, monkeypa class TestSignalRetryLogic: - """Test signal based retry logic in ActivitySubprocess.""" + """Test retry logic for exit codes (signals and non-signal failures) in ActivitySubprocess.""" @pytest.mark.parametrize( "signal", @@ -2486,8 +2486,8 @@ class TestSignalRetryLogic: result = mock_watched_subprocess.final_state assert result == TaskInstanceState.FAILED - def test_non_signal_exit_code_goes_to_failed(self, mocker): - """Test that non signal exit codes go to failed regardless of task retries.""" + def test_non_signal_exit_code_with_retry_goes_to_up_for_retry(self, mocker): + """Test that non-signal exit codes with retries enabled go to UP_FOR_RETRY.""" mock_watched_subprocess = ActivitySubprocess( process_log=mocker.MagicMock(), id=TI_ID, @@ -2499,6 +2499,21 @@ class TestSignalRetryLogic: mock_watched_subprocess._exit_code = 1 mock_watched_subprocess._should_retry = True + assert mock_watched_subprocess.final_state == TaskInstanceState.UP_FOR_RETRY + + def test_non_signal_exit_code_without_retry_goes_to_failed(self, mocker): + """Test that non-signal exit codes without retries enabled go to FAILED.""" + mock_watched_subprocess = ActivitySubprocess( + process_log=mocker.MagicMock(), + id=TI_ID, + pid=12345, + stdin=mocker.Mock(), + process=mocker.Mock(), + client=mocker.Mock(), + ) + mock_watched_subprocess._exit_code = 1 + mock_watched_subprocess._should_retry = False + assert mock_watched_subprocess.final_state == TaskInstanceState.FAILED
