This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1ddadf59b8 Change `httpx` to `requests` in `file_task_handler` (#39799)
1ddadf59b8 is described below
commit 1ddadf59b8089f71d3c0f153aa62112d073039fc
Author: Youngha, Park <[email protected]>
AuthorDate: Sat Jun 15 19:12:35 2024 +0900
Change `httpx` to `requests` in `file_task_handler` (#39799)
* Change httpx to requests in file_task_handler
- httpx does not support CIDRs in NO_PROXY
- simply, convert httpx to requests, issues done
- related issue: https://github.com/apache/airflow/issues/39794
* Add cidr no_proxy test test_log_handlers.py
* Apply monkeypatch fixture
---------
Co-authored-by: scott-py <[email protected]>
---
airflow/utils/log/file_task_handler.py | 8 +++---
tests/utils/test_log_handlers.py | 46 ++++++++++++++++++++++++++++++++++
2 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index a8a1fffebf..24bad09aa3 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -86,7 +86,7 @@ def _set_task_deferred_context_var():
def _fetch_logs_from_service(url, log_relative_path):
# Import occurs in function scope for perf. Ref:
https://github.com/apache/airflow/pull/21438
- import httpx
+ import requests
from airflow.utils.jwt_signer import JWTSigner
@@ -96,7 +96,7 @@ def _fetch_logs_from_service(url, log_relative_path):
expiration_time_in_seconds=conf.getint("webserver",
"log_request_clock_grace", fallback=30),
audience="task-instance-logs",
)
- response = httpx.get(
+ response = requests.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename":
log_relative_path})},
@@ -574,9 +574,9 @@ class FileTaskHandler(logging.Handler):
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
- from httpx import UnsupportedProtocol
+ from requests.exceptions import InvalidSchema
- if isinstance(e, UnsupportedProtocol) and
ti.task.inherits_from_empty_operator is True:
+ if isinstance(e, InvalidSchema) and
ti.task.inherits_from_empty_operator is True:
messages.append(self.inherits_from_empty_operator_log_message)
else:
messages.append(f"Could not read served logs: {e}")
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 101e62a06e..5c88fce0a6 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,6 +21,7 @@ import logging
import logging.config
import os
import re
+from http import HTTPStatus
from importlib import reload
from pathlib import Path
from unittest import mock
@@ -29,6 +30,7 @@ from unittest.mock import patch
import pendulum
import pytest
from kubernetes.client import models as k8s
+from requests.adapters import Response
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
@@ -43,6 +45,7 @@ from airflow.operators.python import PythonOperator
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
+ _fetch_logs_from_service,
_interleave_logs,
_parse_timestamps_in_log_file,
)
@@ -779,3 +782,46 @@ def test_permissions_for_new_directories(tmp_path):
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)
+
+
+worker_url = "http://10.240.5.168:8793"
+log_location =
"dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log"
+log_url = f"{worker_url}/log/{log_location}"
+
+
[email protected]("requests.adapters.HTTPAdapter.send")
+def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send,
monkeypatch):
+ monkeypatch.setenv("http_proxy", "http://proxy.example.com")
+ monkeypatch.setenv("no_proxy", "localhost")
+
+ response = Response()
+ response.status_code = HTTPStatus.OK
+ mock_send.return_value = response
+
+ _fetch_logs_from_service(log_url, log_location)
+
+ mock_send.assert_called()
+ _, kwargs = mock_send.call_args
+ assert "proxies" in kwargs
+ proxies = kwargs["proxies"]
+ assert "http" in proxies.keys()
+ assert "no" in proxies.keys()
+
+
[email protected]("requests.adapters.HTTPAdapter.send")
+def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
+ monkeypatch.setenv("http_proxy", "http://proxy.example.com")
+ monkeypatch.setenv("no_proxy", "10.0.0.0/8")
+
+ response = Response()
+ response.status_code = HTTPStatus.OK
+ mock_send.return_value = response
+
+ _fetch_logs_from_service(log_url, log_location)
+
+ mock_send.assert_called()
+ _, kwargs = mock_send.call_args
+ assert "proxies" in kwargs
+ proxies = kwargs["proxies"]
+ assert "http" not in proxies.keys()
+ assert "no" not in proxies.keys()