This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c4cee76051 fix: Handle Kubernetes API responses with non-JSON bodies 
(#55107)
6c4cee76051 is described below

commit 6c4cee760511f3fb35d162432bb52b1b353dd04c
Author: Gary Hsu <[email protected]>
AuthorDate: Sun Aug 31 17:02:08 2025 +0800

    fix: Handle Kubernetes API responses with non-JSON bodies (#55107)
    
    - Add robust JSON parsing with fallback for API exception bodies
    
    - Handle 429 responses and other cases where response body is plain text
    
    - Update KubernetesExecutor to safely parse ApiException.body
    
    - Update KubernetesInstallKueueOperator to handle non-JSON error bodies
---
 .../kubernetes/executors/kubernetes_executor.py    | 19 ++++++++++++++----
 .../providers/cncf/kubernetes/operators/kueue.py   | 19 ++++++++++++++++--
 .../executors/test_kubernetes_executor.py          | 21 ++++++++++++++++++++
 .../unit/cncf/kubernetes/operators/test_kueue.py   | 23 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 6 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index ee5dfa9db73..130d37b546c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -368,12 +368,23 @@ class KubernetesExecutor(BaseExecutor):
                     )
                     self.fail(task[0], e)
                 except ApiException as e:
-                    body = json.loads(e.body)
+                    try:
+                        if e.body:
+                            body = json.loads(e.body)
+                        else:
+                            # If no body content, use reason as the message
+                            body = {"message": e.reason}
+                    except (json.JSONDecodeError, ValueError, TypeError):
+                        # If the body is a string (e.g., in a 429 error), it 
can't be parsed as JSON.
+                        # Use the body directly as the message instead.
+                        body = {"message": e.body}
+
                     retries = self.task_publish_retries[key]
                     # In case of exceeded quota or conflict errors, requeue 
the task as per the task_publish_max_retries
+                    message = body.get("message", "")
                     if (
-                        (str(e.status) == "403" and "exceeded quota" in 
body["message"])
-                        or (str(e.status) == "409" and "object has been 
modified" in body["message"])
+                        (str(e.status) == "403" and "exceeded quota" in 
message)
+                        or (str(e.status) == "409" and "object has been 
modified" in message)
                     ) and (self.task_publish_max_retries == -1 or retries < 
self.task_publish_max_retries):
                         self.log.warning(
                             "[Try %s of %s] Kube ApiException for Task: (%s). 
Reason: %r. Message: %s",
@@ -381,7 +392,7 @@ class KubernetesExecutor(BaseExecutor):
                             self.task_publish_max_retries,
                             key,
                             e.reason,
-                            body["message"],
+                            message,
                         )
                         self.task_queue.put(task)
                         self.task_publish_retries[key] = retries + 1
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
index e0c7c549e90..718f93428cf 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
@@ -65,12 +65,27 @@ class KubernetesInstallKueueOperator(BaseOperator):
         try:
             self.hook.apply_from_yaml_file(yaml_objects=yaml_objects)
         except FailToCreateError as ex:
-            error_bodies = [json.loads(e.body) for e in ex.api_exceptions]
+            error_bodies = []
+            for e in ex.api_exceptions:
+                try:
+                    if e.body:
+                        error_bodies.append(json.loads(e.body))
+                    else:
+                        # If no body content, use reason as the message
+                        reason = getattr(e, "reason", "Unknown")
+                        error_bodies.append({"message": reason, "reason": 
reason})
+                except (json.JSONDecodeError, ValueError, TypeError):
+                    # If the body is a string (e.g., in a 429 error), it can't 
be parsed as JSON.
+                    # Use the body directly as the message instead.
+                    error_bodies.append({"message": e.body, "reason": 
getattr(e, "reason", "Unknown")})
             if next((e for e in error_bodies if e.get("reason") == 
"AlreadyExists"), None):
                 self.log.info("Kueue is already enabled for the cluster")
 
             if errors := [e for e in error_bodies if e.get("reason") != 
"AlreadyExists"]:
-                error_message = "\n".join(e.get("body") for e in errors)
+                error_message = "\n".join(
+                    e.get("message") or e.get("body") or f"Unknown error: 
{e.get('reason', 'Unknown')}"
+                    for e in errors
+                )
                 raise AirflowException(error_message)
             return
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 3d6bb97167f..7fb6f790d87 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -386,6 +386,27 @@ class TestKubernetesExecutor:
                 State.SUCCESS,
                 id="409 conflict",
             ),
+            pytest.param(
+                HTTPResponse(body="Too many requests, please try again 
later.", status=429),
+                0,
+                False,
+                State.FAILED,
+                id="429 Too Many Requests (non-JSON body)",
+            ),
+            pytest.param(
+                HTTPResponse(body="Too many requests, please try again 
later.", status=429),
+                1,
+                False,
+                State.FAILED,
+                id="429 Too Many Requests (non-JSON body) 
(task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(body="", status=429),
+                0,
+                False,
+                State.FAILED,
+                id="429 Too Many Requests (empty body)",
+            ),
         ],
     )
     
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
index 8d43db4889f..595234df089 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py
@@ -115,6 +115,29 @@ class TestKubernetesInstallKueueOperator:
         
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
         mock_log.info.assert_called_once_with("Kueue is already enabled for 
the cluster")
 
+    
@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesInstallKueueOperator.log"))
+    @mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesHook"))
+    def test_execute_non_json_response(self, mock_hook, mock_log):
+        """Test handling of non-JSON API response bodies (e.g., 429 errors)."""
+        mock_get_yaml_content_from_file = 
mock_hook.return_value.get_yaml_content_from_file
+        mock_yaml_objects = mock_get_yaml_content_from_file.return_value
+        mock_apply_from_yaml_file = mock_hook.return_value.apply_from_yaml_file
+
+        # Create mock exceptions with non-JSON bodies (simulating 429 errors)
+        api_exceptions = [
+            mock.MagicMock(body="Too many requests, please try again later.", 
reason="TooManyRequests"),
+            mock.MagicMock(body="", reason="RateLimited"),  # Empty body case
+        ]
+        mock_apply_from_yaml_file.side_effect = 
FailToCreateError(api_exceptions)
+        expected_error_message = "Too many requests, please try again 
later.\nRateLimited"
+
+        with pytest.raises(AirflowException, match=expected_error_message):
+            self.operator.execute(context=mock.MagicMock())
+
+        
mock_get_yaml_content_from_file.assert_called_once_with(kueue_yaml_url=KUEUE_YAML_URL)
+        
mock_apply_from_yaml_file.assert_called_once_with(yaml_objects=mock_yaml_objects)
+        
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
+
 
 class TestKubernetesStartKueueJobOperator:
     def test_template_fields(self):

Reply via email to