This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c4cee76051 fix: Handle Kubernetes API responses with non-JSON bodies
(#55107)
6c4cee76051 is described below
commit 6c4cee760511f3fb35d162432bb52b1b353dd04c
Author: Gary Hsu <[email protected]>
AuthorDate: Sun Aug 31 17:02:08 2025 +0800
fix: Handle Kubernetes API responses with non-JSON bodies (#55107)
- Add robust JSON parsing with fallback for API exception bodies
- Handle 429 responses and other cases where response body is plain text
- Update KubernetesExecutor to safely parse ApiException.body
- Update KubernetesInstallKueueOperator to handle non-JSON error bodies
---
.../kubernetes/executors/kubernetes_executor.py | 19 ++++++++++++++----
.../providers/cncf/kubernetes/operators/kueue.py | 19 ++++++++++++++++--
.../executors/test_kubernetes_executor.py | 21 ++++++++++++++++++++
.../unit/cncf/kubernetes/operators/test_kueue.py | 23 ++++++++++++++++++++++
4 files changed, 76 insertions(+), 6 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index ee5dfa9db73..130d37b546c 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -368,12 +368,23 @@ class KubernetesExecutor(BaseExecutor):
)
self.fail(task[0], e)
except ApiException as e:
- body = json.loads(e.body)
+ try:
+ if e.body:
+ body = json.loads(e.body)
+ else:
+ # If no body content, use reason as the message
+ body = {"message": e.reason}
+ except (json.JSONDecodeError, ValueError, TypeError):
+ # If the body is a string (e.g., in a 429 error), it
can't be parsed as JSON.
+ # Use the body directly as the message instead.
+ body = {"message": e.body}
+
retries = self.task_publish_retries[key]
# In case of exceeded quota or conflict errors, requeue
the task as per the task_publish_max_retries
+ message = body.get("message", "")
if (
- (str(e.status) == "403" and "exceeded quota" in
body["message"])
- or (str(e.status) == "409" and "object has been
modified" in body["message"])
+ (str(e.status) == "403" and "exceeded quota" in
message)
+ or (str(e.status) == "409" and "object has been
modified" in message)
) and (self.task_publish_max_retries == -1 or retries <
self.task_publish_max_retries):
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s).
Reason: %r. Message: %s",
@@ -381,7 +392,7 @@ class KubernetesExecutor(BaseExecutor):
self.task_publish_max_retries,
key,
e.reason,
- body["message"],
+ message,
)
self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
index e0c7c549e90..718f93428cf 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
@@ -65,12 +65,27 @@ class KubernetesInstallKueueOperator(BaseOperator):
try:
self.hook.apply_from_yaml_file(yaml_objects=yaml_objects)
except FailToCreateError as ex:
- error_bodies = [json.loads(e.body) for e in ex.api_exceptions]
+ error_bodies = []
+ for e in ex.api_exceptions:
+ try:
+ if e.body:
+ error_bodies.append(json.loads(e.body))
+ else:
+ # If no body content, use reason as the message
+ reason = getattr(e, "reason", "Unknown")
+ error_bodies.append({"message": reason, "reason":
reason})
+ except (json.JSONDecodeError, ValueError, TypeError):
+ # If the body is a string (e.g., in a 429 error), it can't
be parsed as JSON.
+ # Use the body directly as the message instead.
+ error_bodies.append({"message": e.body, "reason":
getattr(e, "reason", "Unknown")})
if next((e for e in error_bodies if e.get("reason") ==
"AlreadyExists"), None):
self.log.info("Kueue is already enabled for the cluster")
if errors := [e for e in error_bodies if e.get("reason") !=
"AlreadyExists"]:
- error_message = "\n".join(e.get("body") for e in errors)
+ error_message = "\n".join(
+ e.get("message") or e.get("body") or f"Unknown error:
{e.get('reason', 'Unknown')}"
+ for e in errors
+ )
raise AirflowException(error_message)
return
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 3d6bb97167f..7fb6f790d87 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -386,6 +386,27 @@ class TestKubernetesExecutor:
State.SUCCESS,
id="409 conflict",
),
+ pytest.param(
+ HTTPResponse(body="Too many requests, please try again
later.", status=429),
+ 0,
+ False,
+ State.FAILED,
+ id="429 Too Many Requests (non-JSON body)",
+ ),
+ pytest.param(
+ HTTPResponse(body="Too many requests, please try again
later.", status=429),
+ 1,
+ False,
+ State.FAILED,
+ id="429 Too Many Requests (non-JSON body)
(task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(body="", status=429),
+ 0,
+ False,
+ State.FAILED,
+ id="429 Too Many Requests (empty body)",
+ ),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
index 8d43db4889f..595234df089 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
@@ -115,6 +115,29 @@ class TestKubernetesInstallKueueOperator:
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
mock_log.info.assert_called_once_with("Kueue is already enabled for
the cluster")
+
@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesInstallKueueOperator.log"))
+ @mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesHook"))
+ def test_execute_non_json_response(self, mock_hook, mock_log):
+ """Test handling of non-JSON API response bodies (e.g., 429 errors)."""
+ mock_get_yaml_content_from_file =
mock_hook.return_value.get_yaml_content_from_file
+ mock_yaml_objects = mock_get_yaml_content_from_file.return_value
+ mock_apply_from_yaml_file = mock_hook.return_value.apply_from_yaml_file
+
+ # Create mock exceptions with non-JSON bodies (simulating 429 errors)
+ api_exceptions = [
+ mock.MagicMock(body="Too many requests, please try again later.",
reason="TooManyRequests"),
+ mock.MagicMock(body="", reason="RateLimited"), # Empty body case
+ ]
+ mock_apply_from_yaml_file.side_effect =
FailToCreateError(api_exceptions)
+ expected_error_message = "Too many requests, please try again
later.\nRateLimited"
+
+ with pytest.raises(AirflowException, match=expected_error_message):
+ self.operator.execute(context=mock.MagicMock())
+
+
mock_get_yaml_content_from_file.assert_called_once_with(kueue_yaml_url=KUEUE_YAML_URL)
+
mock_apply_from_yaml_file.assert_called_once_with(yaml_objects=mock_yaml_objects)
+
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
+
class TestKubernetesStartKueueJobOperator:
def test_template_fields(self):