This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9f17981cab3 Fix `RayJobBaseOperator` polling to recognize STOPPED as
terminal status (#64206)
9f17981cab3 is described below
commit 9f17981cab37b8a8a66991cf68cf2963de9f2fa1
Author: Elad Kalif <[email protected]>
AuthorDate: Wed Mar 25 12:45:02 2026 +0200
Fix `RayJobBaseOperator` polling to recognize STOPPED as terminal status
(#64206)
---
.../google/src/airflow/providers/google/cloud/operators/ray.py | 2 +-
providers/google/tests/unit/google/cloud/operators/test_ray.py | 10 +++++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/ray.py
b/providers/google/src/airflow/providers/google/cloud/operators/ray.py
index c243c7d8e26..4a907e8170b 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/ray.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/ray.py
@@ -36,7 +36,7 @@ if TYPE_CHECKING:
from airflow.providers.common.compat.sdk import Context
-TERMINAL_STATUSES = {JobStatus.SUCCEEDED.value, JobStatus.FAILED.value}
+TERMINAL_STATUSES = {JobStatus.SUCCEEDED.value, JobStatus.FAILED.value,
JobStatus.STOPPED.value}
class OperationFailedException(Exception):
diff --git a/providers/google/tests/unit/google/cloud/operators/test_ray.py
b/providers/google/tests/unit/google/cloud/operators/test_ray.py
index 5d8aee1f172..04667142e38 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_ray.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_ray.py
@@ -165,9 +165,13 @@ class TestRaySubmitJobOperator:
},
)
+ @pytest.mark.parametrize(
+ "terminal_status",
+ [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.STOPPED],
+ )
@mock.patch(RAY_OP_PATH.format("time.sleep"))
@mock.patch(RAY_OP_PATH.format("RayJobHook"))
- def test_check_job_status_reaches_terminal(self, mock_hook_cls,
mock_sleep):
+ def test_check_job_status_reaches_terminal(self, mock_hook_cls,
mock_sleep, terminal_status):
mock_hook = mock_hook_cls.return_value
mock_hook.stop_job.return_value = True
@@ -181,11 +185,11 @@ class TestRaySubmitJobOperator:
get_job_logs=True,
)
operator.hook.get_job_status = mock.MagicMock(
- side_effect=[JobStatus.RUNNING, JobStatus.RUNNING,
JobStatus.SUCCEEDED]
+ side_effect=[JobStatus.RUNNING, JobStatus.RUNNING, terminal_status]
)
status = operator._check_job_status("addr", "job", polling_interval=1,
timeout=100)
- assert status == JobStatus.SUCCEEDED
+ assert status == terminal_status
assert mock_sleep.call_count == 2
@mock.patch(RAY_OP_PATH.format("time.sleep"))