This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cdc7f19b57 Add retry logic for RPC calls (#38910)
cdc7f19b57 is described below
commit cdc7f19b571a99cbbad5091dcb11e2d4f1439fb3
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Apr 10 19:47:20 2024 -0700
Add retry logic for RPC calls (#38910)
I have found that when RPC server restarts it can take 30-60s for the
server to be able to respond to RPC calls. This implements exponential wait for
that case. 10 might seem excessive but I found that 3 or 5 didn't always do the
trick.
---
airflow/api_internal/internal_api_call.py | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/airflow/api_internal/internal_api_call.py
b/airflow/api_internal/internal_api_call.py
index 8dcd5dba30..c3a67d03ee 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -19,10 +19,13 @@ from __future__ import annotations
import inspect
import json
+import logging
from functools import wraps
from typing import Callable, TypeVar
import requests
+import tenacity
+from urllib3.exceptions import NewConnectionError
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
@@ -32,6 +35,8 @@ from airflow.typing_compat import ParamSpec
PS = ParamSpec("PS")
RT = TypeVar("RT")
+logger = logging.getLogger(__name__)
+
class InternalApiConfig:
"""Stores and caches configuration for Internal API."""
@@ -96,7 +101,14 @@ def internal_api_call(func: Callable[PS, RT]) ->
Callable[PS, RT]:
headers = {
"Content-Type": "application/json",
}
-
+ from requests.exceptions import ConnectionError
+
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(10),
+ wait=tenacity.wait_exponential(min=1),
+ retry=tenacity.retry_if_exception_type((NewConnectionError,
ConnectionError)),
+ before_sleep=tenacity.before_log(logger, logging.WARNING),
+ )
def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()