This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ddadf59b8 Change `httpx` to `requests` in `file_task_handler` (#39799)
1ddadf59b8 is described below

commit 1ddadf59b8089f71d3c0f153aa62112d073039fc
Author: Youngha, Park <[email protected]>
AuthorDate: Sat Jun 15 19:12:35 2024 +0900

    Change `httpx` to `requests` in `file_task_handler` (#39799)
    
    * Change httpx to requests in file_task_handler
    
    - httpx does not support CIDRs in NO_PROXY
    - simply, convert httpx to requests, issues done
    - related issue: https://github.com/apache/airflow/issues/39794
    
    * Add cidr no_proxy test test_log_handlers.py
    
    * Apply monkeypatch fixture
    
    ---------
    
    Co-authored-by: scott-py <[email protected]>
---
 airflow/utils/log/file_task_handler.py |  8 +++---
 tests/utils/test_log_handlers.py       | 46 ++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index a8a1fffebf..24bad09aa3 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -86,7 +86,7 @@ def _set_task_deferred_context_var():
 
 def _fetch_logs_from_service(url, log_relative_path):
     # Import occurs in function scope for perf. Ref: 
https://github.com/apache/airflow/pull/21438
-    import httpx
+    import requests
 
     from airflow.utils.jwt_signer import JWTSigner
 
@@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path):
         expiration_time_in_seconds=conf.getint("webserver", 
"log_request_clock_grace", fallback=30),
         audience="task-instance-logs",
     )
-    response = httpx.get(
+    response = requests.get(
         url,
         timeout=timeout,
         headers={"Authorization": signer.generate_signed_token({"filename": 
log_relative_path})},
@@ -574,9 +574,9 @@ class FileTaskHandler(logging.Handler):
                 messages.append(f"Found logs served from host {url}")
                 logs.append(response.text)
         except Exception as e:
-            from httpx import UnsupportedProtocol
+            from requests.exceptions import InvalidSchema
 
-            if isinstance(e, UnsupportedProtocol) and 
ti.task.inherits_from_empty_operator is True:
+            if isinstance(e, InvalidSchema) and 
ti.task.inherits_from_empty_operator is True:
                 messages.append(self.inherits_from_empty_operator_log_message)
             else:
                 messages.append(f"Could not read served logs: {e}")
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 101e62a06e..5c88fce0a6 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,6 +21,7 @@ import logging
 import logging.config
 import os
 import re
+from http import HTTPStatus
 from importlib import reload
 from pathlib import Path
 from unittest import mock
@@ -29,6 +30,7 @@ from unittest.mock import patch
 import pendulum
 import pytest
 from kubernetes.client import models as k8s
+from requests.adapters import Response
 
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.exceptions import RemovedInAirflow3Warning
@@ -43,6 +45,7 @@ from airflow.operators.python import PythonOperator
 from airflow.utils.log.file_task_handler import (
     FileTaskHandler,
     LogType,
+    _fetch_logs_from_service,
     _interleave_logs,
     _parse_timestamps_in_log_file,
 )
@@ -779,3 +782,46 @@ def test_permissions_for_new_directories(tmp_path):
         assert base_dir.stat().st_mode % 0o1000 == default_permissions
     finally:
         os.umask(old_umask)
+
+
+worker_url = "http://10.240.5.168:8793";
+log_location = 
"dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log"
+log_url = f"{worker_url}/log/{log_location}"
+
+
[email protected]("requests.adapters.HTTPAdapter.send")
+def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, 
monkeypatch):
+    monkeypatch.setenv("http_proxy", "http://proxy.example.com";)
+    monkeypatch.setenv("no_proxy", "localhost")
+
+    response = Response()
+    response.status_code = HTTPStatus.OK
+    mock_send.return_value = response
+
+    _fetch_logs_from_service(log_url, log_location)
+
+    mock_send.assert_called()
+    _, kwargs = mock_send.call_args
+    assert "proxies" in kwargs
+    proxies = kwargs["proxies"]
+    assert "http" in proxies.keys()
+    assert "no" in proxies.keys()
+
+
[email protected]("requests.adapters.HTTPAdapter.send")
+def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
+    monkeypatch.setenv("http_proxy", "http://proxy.example.com";)
+    monkeypatch.setenv("no_proxy", "10.0.0.0/8")
+
+    response = Response()
+    response.status_code = HTTPStatus.OK
+    mock_send.return_value = response
+
+    _fetch_logs_from_service(log_url, log_location)
+
+    mock_send.assert_called()
+    _, kwargs = mock_send.call_args
+    assert "proxies" in kwargs
+    proxies = kwargs["proxies"]
+    assert "http" not in proxies.keys()
+    assert "no" not in proxies.keys()

Reply via email to