This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cdc7f19b57 Add retry logic for RPC calls (#38910)
cdc7f19b57 is described below

commit cdc7f19b571a99cbbad5091dcb11e2d4f1439fb3
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Apr 10 19:47:20 2024 -0700

    Add retry logic for RPC calls (#38910)
    
    I have found that when RPC server restarts it can take 30-60s for the 
server to be able to respond to RPC calls. This implements exponential wait for 
that case. 10 might seem excessive but I found that 3 or 5 didn't always do the 
trick.
---
 airflow/api_internal/internal_api_call.py | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/airflow/api_internal/internal_api_call.py 
b/airflow/api_internal/internal_api_call.py
index 8dcd5dba30..c3a67d03ee 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -19,10 +19,13 @@ from __future__ import annotations
 
 import inspect
 import json
+import logging
 from functools import wraps
 from typing import Callable, TypeVar
 
 import requests
+import tenacity
+from urllib3.exceptions import NewConnectionError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
@@ -32,6 +35,8 @@ from airflow.typing_compat import ParamSpec
 PS = ParamSpec("PS")
 RT = TypeVar("RT")
 
+logger = logging.getLogger(__name__)
+
 
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
@@ -96,7 +101,14 @@ def internal_api_call(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
     headers = {
         "Content-Type": "application/json",
     }
-
+    from requests.exceptions import ConnectionError
+
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(10),
+        wait=tenacity.wait_exponential(min=1),
+        retry=tenacity.retry_if_exception_type((NewConnectionError, 
ConnectionError)),
+        before_sleep=tenacity.before_log(logger, logging.WARNING),
+    )
     def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
         data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
         internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()

Reply via email to