This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f69f9fb6ed Generalize exception error_id tracing in RPC server (#39186)
f69f9fb6ed is described below

commit f69f9fb6ed40175dd8a21b4fd007aa88012e511f
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Apr 22 22:29:59 2024 -0700

    Generalize exception error_id tracing in RPC server (#39186)
    
    There were other ways in which the RPC server could fail and we should 
provide a mechanism to find those logs too.
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py | 30 ++++++++++------------
 .../endpoints/test_rpc_api_endpoint.py             |  4 +--
 2 files changed, 16 insertions(+), 18 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index f2fa53d3ee..c5089718d6 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -107,19 +107,25 @@ def _initialize_map() -> dict[str, Callable]:
     return {f"{func.__module__}.{func.__qualname__}": func for func in 
functions}
 
 
+def log_and_build_error_response(message, status):
+    error_id = uuid4()
+    server_message = message + f" error_id={error_id}"
+    log.exception(server_message)
+    client_message = message + f" The server side traceback may be identified 
with error_id={error_id}"
+    return Response(response=client_message, status=status)
+
+
 def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
     """Handle Internal API /internal_api/v1/rpcapi endpoint."""
     log.debug("Got request")
     json_rpc = body.get("jsonrpc")
     if json_rpc != "2.0":
-        log.error("Not jsonrpc-2.0 request.")
-        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+        return log_and_build_error_response(message="Expected jsonrpc 2.0 
request.", status=400)
 
     methods_map = _initialize_map()
     method_name = body.get("method")
     if method_name not in methods_map:
-        log.error("Unrecognized method: %s.", method_name)
-        return Response(response=f"Unrecognized method: {method_name}.", 
status=400)
+        return log_and_build_error_response(message=f"Unrecognized method: 
{method_name}.", status=400)
 
     handler = methods_map[method_name]
     params = {}
@@ -127,12 +133,10 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
         if body.get("params"):
             params_json = body.get("params")
             params = BaseSerialization.deserialize(params_json, 
use_pydantic_models=True)
-    except Exception as e:
-        log.error("Error when deserializing parameters for method: %s.", 
method_name)
-        log.exception(e)
-        return Response(response="Error deserializing parameters.", status=400)
+    except Exception:
+        return log_and_build_error_response(message="Error deserializing 
parameters.", status=400)
 
-    log.debug("Calling method %s.", method_name)
+    log.debug("Calling method %s\nparams: %s", method_name, params)
     try:
         # Session must be created there as it may be needed by serializer for 
lazy-loaded fields.
         with create_session() as session:
@@ -141,10 +145,4 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
             response = json.dumps(output_json) if output_json is not None else 
None
             return Response(response=response, headers={"Content-Type": 
"application/json"})
     except Exception:
-        error_id = uuid4()
-        log.exception("Error executing method '%s'; error_id=%s.", 
method_name, error_id)
-        return Response(
-            response=f"Error executing method '{method_name}'. "
-            f"The server side traceback may be identified with 
error_id={error_id}",
-            status=500,
-        )
+        return log_and_build_error_response(message=f"Error executing method 
'{method_name}'.", status=500)
diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py 
b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
index afa3bc9920..4c312da3a7 100644
--- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
+++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
@@ -149,7 +149,7 @@ class TestRpcApiEndpoint:
             "/internal_api/v1/rpcapi", headers={"Content-Type": 
"application/json"}, data=json.dumps(data)
         )
         assert response.status_code == 400
-        assert response.data == b"Unrecognized method: 
i-bet-it-does-not-exist."
+        assert response.data.startswith(b"Unrecognized method: 
i-bet-it-does-not-exist.")
         mock_test_method.assert_not_called()
 
     def test_invalid_jsonrpc(self):
@@ -159,5 +159,5 @@ class TestRpcApiEndpoint:
             "/internal_api/v1/rpcapi", headers={"Content-Type": 
"application/json"}, data=json.dumps(data)
         )
         assert response.status_code == 400
-        assert response.data == b"Expected jsonrpc 2.0 request."
+        assert response.data.startswith(b"Expected jsonrpc 2.0 request.")
         mock_test_method.assert_not_called()

Reply via email to