amoghrajesh commented on code in PR #45121:
URL: https://github.com/apache/airflow/pull/45121#discussion_r1895302869


##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -263,6 +267,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
     return httpx.Response(200, json={"text": "Hello, world!"})
 
 
+# Config options for SDK how retries on HTTP requests should be handled
+# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 
3:37 and fails after 5:07min
+# As long as there is no other config facility in SDK we use ENV for the moment
+API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10))
+API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1))
+API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90))

Review Comment:
   Can we also briefly document this? I think these are new env variables right?



##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -263,6 +267,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
     return httpx.Response(200, json={"text": "Hello, world!"})
 
 
+# Config options for SDK how retries on HTTP requests should be handled
+# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 
3:37 and fails after 5:07min
+# As long as there is no other config facility in SDK we use ENV for the moment
+API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10))
+API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1))
+API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90))
+

Review Comment:
   2. is already handled in the tenacity library. We do not have to explicitly 
handle that



##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -263,6 +267,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
     return httpx.Response(200, json={"text": "Hello, world!"})
 
 
+# Config options for SDK how retries on HTTP requests should be handled
+# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 
3:37 and fails after 5:07min
+# As long as there is no other config facility in SDK we use ENV for the moment

Review Comment:
   ```suggestion
   # So far, is no other config facility in SDK we use ENV for the moment
   # TODO: Replace with conf when available in task sdk
   ```



##########
task_sdk/tests/api/test_client.py:
##########
@@ -53,39 +64,92 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
         ]
 
     def test_error_parsing_plain_text(self):
-        def handle_request(request: httpx.Request) -> httpx.Response:
-            """
-            A transport handle that always returns errors
-            """
-
-            return httpx.Response(422, content=b"Internal Server Error")
-
-        client = Client(
-            base_url=None, dry_run=True, token="", mounts={"'http://": 
httpx.MockTransport(handle_request)}
-        )
+        responses = [httpx.Response(422, content=b"Internal Server Error")]
+        client = make_client_w_responses(responses)
 
         with pytest.raises(httpx.HTTPStatusError) as err:
             client.get("http://error";)
         assert not isinstance(err.value, ServerResponseError)
 
     def test_error_parsing_other_json(self):
-        def handle_request(request: httpx.Request) -> httpx.Response:
-            # Some other json than an error body.
-            return httpx.Response(404, json={"detail": "Not found"})
-
-        client = Client(
-            base_url=None, dry_run=True, token="", mounts={"'http://": 
httpx.MockTransport(handle_request)}
-        )
+        responses = [httpx.Response(404, json={"detail": "Not found"})]
+        client = make_client_w_responses(responses)
 
         with pytest.raises(ServerResponseError) as err:
             client.get("http://error";)
         assert err.value.args == ("Not found",)
         assert err.value.detail is None
 
+    @mock.patch("time.sleep", return_value=None)
+    def test_retry_handling_unrecoverable_error(self, mock_sleep):
+        responses: list[httpx.Response] = [
+            *[httpx.Response(500, text="Internal Server Error")] * 11,
+            httpx.Response(200, json={"detail": "Recovered from error - but 
will fail before"}),
+            httpx.Response(400, json={"detail": "Should not get here"}),
+        ]
+        client = make_client_w_responses(responses)
 
-def make_client(transport: httpx.MockTransport) -> Client:
-    """Get a client with a custom transport"""
-    return Client(base_url="test://server", token="", transport=transport)
+        with pytest.raises(httpx.HTTPStatusError) as err:
+            client.get("http://error";)
+        assert not isinstance(err.value, ServerResponseError)
+        assert len(responses) == 3
+        assert mock_sleep.call_count == 9
+
+    @mock.patch("time.sleep", return_value=None)
+    def test_retry_handling_recovered(self, mock_sleep):
+        responses: list[httpx.Response] = [
+            *[httpx.Response(500, text="Internal Server Error")] * 3,
+            httpx.Response(200, json={"detail": "Recovered from error"}),
+            httpx.Response(400, json={"detail": "Should not get here"}),
+        ]
+        client = make_client_w_responses(responses)
+
+        response = client.get("http://error";)
+        assert response.status_code == 200
+        assert len(responses) == 1
+        assert mock_sleep.call_count == 3
+
+    @mock.patch("time.sleep", return_value=None)
+    def test_retry_handling_overload(self, mock_sleep):
+        responses: list[httpx.Response] = [
+            httpx.Response(429, text="I am really busy atm, please back-off", 
headers={"Retry-After": "37"}),
+            httpx.Response(200, json={"detail": "Recovered from error"}),
+            httpx.Response(400, json={"detail": "Should not get here"}),
+        ]
+        client = make_client_w_responses(responses)
+
+        response = client.get("http://error";)
+        assert response.status_code == 200
+        assert len(responses) == 1
+        assert mock_sleep.call_count == 1
+        assert mock_sleep.call_args[0][0] == 37
+
+    @mock.patch("time.sleep", return_value=None)
+    def test_retry_handling_non_retry_error(self, mock_sleep):
+        responses: list[httpx.Response] = [
+            httpx.Response(422, json={"detail": "Somehow this is a bad 
request"}),
+            httpx.Response(400, json={"detail": "Should not get here"}),
+        ]
+        client = make_client_w_responses(responses)
+
+        with pytest.raises(ServerResponseError) as err:
+            client.get("http://error";)
+        assert len(responses) == 1
+        assert mock_sleep.call_count == 0
+        assert err.value.args == ("Somehow this is a bad request",)
+
+    @mock.patch("time.sleep", return_value=None)
+    def test_retry_handling_ok(self, mock_sleep):
+        responses: list[httpx.Response] = [
+            httpx.Response(200, json={"detail": "Recovered from error"}),
+            httpx.Response(400, json={"detail": "Should not get here"}),
+        ]
+        client = make_client_w_responses(responses)
+
+        response = client.get("http://error";)
+        assert response.status_code == 200
+        assert len(responses) == 1
+        assert mock_sleep.call_count == 0
 
 
 class TestTaskInstanceOperations:

Review Comment:
   Should we replace the client initialisation in all the other test classes to 
use `make_client_w_responses` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to