This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f69f9fb6ed Generalize exception error_id tracing in RPC server (#39186)
f69f9fb6ed is described below
commit f69f9fb6ed40175dd8a21b4fd007aa88012e511f
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Apr 22 22:29:59 2024 -0700
Generalize exception error_id tracing in RPC server (#39186)
There were other ways in which the RPC server could fail and we should
provide a mechanism to find those logs too.
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 30 ++++++++++------------
.../endpoints/test_rpc_api_endpoint.py | 4 +--
2 files changed, 16 insertions(+), 18 deletions(-)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index f2fa53d3ee..c5089718d6 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -107,19 +107,25 @@ def _initialize_map() -> dict[str, Callable]:
return {f"{func.__module__}.{func.__qualname__}": func for func in
functions}
+def log_and_build_error_response(message, status):
+ error_id = uuid4()
+ server_message = message + f" error_id={error_id}"
+ log.exception(server_message)
+ client_message = message + f" The server side traceback may be identified
with error_id={error_id}"
+ return Response(response=client_message, status=status)
+
+
def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
"""Handle Internal API /internal_api/v1/rpcapi endpoint."""
log.debug("Got request")
json_rpc = body.get("jsonrpc")
if json_rpc != "2.0":
- log.error("Not jsonrpc-2.0 request.")
- return Response(response="Expected jsonrpc 2.0 request.", status=400)
+ return log_and_build_error_response(message="Expected jsonrpc 2.0
request.", status=400)
methods_map = _initialize_map()
method_name = body.get("method")
if method_name not in methods_map:
- log.error("Unrecognized method: %s.", method_name)
- return Response(response=f"Unrecognized method: {method_name}.",
status=400)
+ return log_and_build_error_response(message=f"Unrecognized method:
{method_name}.", status=400)
handler = methods_map[method_name]
params = {}
@@ -127,12 +133,10 @@ def internal_airflow_api(body: dict[str, Any]) ->
APIResponse:
if body.get("params"):
params_json = body.get("params")
params = BaseSerialization.deserialize(params_json,
use_pydantic_models=True)
- except Exception as e:
- log.error("Error when deserializing parameters for method: %s.",
method_name)
- log.exception(e)
- return Response(response="Error deserializing parameters.", status=400)
+ except Exception:
+ return log_and_build_error_response(message="Error deserializing
parameters.", status=400)
- log.debug("Calling method %s.", method_name)
+ log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for
lazy-loaded fields.
with create_session() as session:
@@ -141,10 +145,4 @@ def internal_airflow_api(body: dict[str, Any]) ->
APIResponse:
response = json.dumps(output_json) if output_json is not None else
None
return Response(response=response, headers={"Content-Type":
"application/json"})
except Exception:
- error_id = uuid4()
- log.exception("Error executing method '%s'; error_id=%s.",
method_name, error_id)
- return Response(
- response=f"Error executing method '{method_name}'. "
- f"The server side traceback may be identified with
error_id={error_id}",
- status=500,
- )
+ return log_and_build_error_response(message=f"Error executing method
'{method_name}'.", status=500)
diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
index afa3bc9920..4c312da3a7 100644
--- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
+++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
@@ -149,7 +149,7 @@ class TestRpcApiEndpoint:
"/internal_api/v1/rpcapi", headers={"Content-Type":
"application/json"}, data=json.dumps(data)
)
assert response.status_code == 400
- assert response.data == b"Unrecognized method:
i-bet-it-does-not-exist."
+ assert response.data.startswith(b"Unrecognized method:
i-bet-it-does-not-exist.")
mock_test_method.assert_not_called()
def test_invalid_jsonrpc(self):
@@ -159,5 +159,5 @@ class TestRpcApiEndpoint:
"/internal_api/v1/rpcapi", headers={"Content-Type":
"application/json"}, data=json.dumps(data)
)
assert response.status_code == 400
- assert response.data == b"Expected jsonrpc 2.0 request."
+ assert response.data.startswith(b"Expected jsonrpc 2.0 request.")
mock_test_method.assert_not_called()