This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d02b66523f2 Fix SageMaker processing stopped state handling (#67291)
d02b66523f2 is described below
commit d02b66523f2985f63fd9b55443fbcf3571543a5e
Author: Aditya Patel <[email protected]>
AuthorDate: Fri May 22 08:15:15 2026 -0400
Fix SageMaker processing stopped state handling (#67291)
---
.../providers/amazon/aws/operators/sagemaker.py | 2 +-
.../aws/operators/test_sagemaker_processing.py | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
index c4fadec6a16..c99228a3e42 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -335,7 +335,7 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
if self.deferrable and self.wait_for_completion:
response =
self.hook.describe_processing_job(self.config["ProcessingJobName"])
status = response["ProcessingJobStatus"]
- if status in self.hook.failed_states:
+ if status in self.hook.processing_job_failed_states:
raise AirflowException(f"SageMaker job failed because
{response['FailureReason']}")
if status == "Completed":
self.log.info("%s completed successfully.", self.task_id)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
index a18058c745e..3bddcf88f3c 100644
---
a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
+++
b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py
@@ -319,6 +319,34 @@ class TestSageMakerProcessingOperator:
assert not mock_defer.called
+
@mock.patch("airflow.providers.amazon.aws.operators.sagemaker.SageMakerProcessingOperator.defer")
+ @mock.patch.object(
+ SageMakerHook,
+ "describe_processing_job",
+ return_value={"ProcessingJobStatus": "Stopped", "FailureReason": "It
stopped"},
+ )
+ @mock.patch.object(
+ SageMakerHook,
+ "create_processing_job",
+ return_value={"ProcessingJobArn": "test_arn", "ResponseMetadata":
{"HTTPStatusCode": 200}},
+ )
+ @mock.patch.object(SageMakerBaseOperator, "_check_if_job_exists",
return_value=False)
+ def test_operator_stopped_before_defer(
+ self,
+ mock_job_exists,
+ mock_processing,
+ mock_describe,
+ mock_defer,
+ ):
+ sagemaker_operator = SageMakerProcessingOperator(
+ **self.defer_processing_config_kwargs,
+ config=CREATE_PROCESSING_PARAMS,
+ )
+ with pytest.raises(AirflowException):
+ sagemaker_operator.execute(context=None)
+
+ assert not mock_defer.called
+
@mock.patch.object(
SageMakerHook,
"describe_processing_job",