This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 81dd4230328524aa5546a2dc42c28b65486fd094 Author: Youngha, Park <[email protected]> AuthorDate: Sat Jun 15 19:12:35 2024 +0900 Change `httpx` to `requests` in `file_task_handler` (#39799) * Change httpx to requests in file_task_handler - httpx does not support CIDRs in NO_PROXY - simply, convert httpx to requests, issues done - related issue: https://github.com/apache/airflow/issues/39794 * Add cidr no_proxy test test_log_handlers.py * Apply monkeypatch fixture --------- Co-authored-by: scott-py <[email protected]> (cherry picked from commit 1ddadf59b8089f71d3c0f153aa62112d073039fc) --- airflow/utils/log/file_task_handler.py | 8 +++--- tests/utils/test_log_handlers.py | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6d35b230b4..1a0ce116ad 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -80,7 +80,7 @@ def _set_task_deferred_context_var(): def _fetch_logs_from_service(url, log_relative_path): # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 - import httpx + import requests from airflow.utils.jwt_signer import JWTSigner @@ -90,7 +90,7 @@ def _fetch_logs_from_service(url, log_relative_path): expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), audience="task-instance-logs", ) - response = httpx.get( + response = requests.get( url, timeout=timeout, headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, @@ -551,9 +551,9 @@ class FileTaskHandler(logging.Handler): messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - from httpx import UnsupportedProtocol + from requests.exceptions import InvalidSchema - if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: + if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True: messages.append(self.inherits_from_empty_operator_log_message) else: messages.append(f"Could not read served logs: {e}") diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 13f1e4de0e..d31010e749 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,6 +21,7 @@ import logging import logging.config import os import re +from http import HTTPStatus from importlib import reload from pathlib import Path from unittest import mock @@ -29,6 +30,7 @@ from unittest.mock import patch import pendulum import pytest from kubernetes.client import models as k8s +from requests.adapters import Response from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.executors import executor_loader @@ -42,6 +44,7 @@ from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, + _fetch_logs_from_service, _interleave_logs, _parse_timestamps_in_log_file, ) @@ -756,3 +759,46 @@ def test_permissions_for_new_directories(tmp_path): assert base_dir.stat().st_mode % 0o1000 == default_permissions finally: os.umask(old_umask) + + +worker_url = "http://10.240.5.168:8793" +log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log" +log_url = f"{worker_url}/log/{log_location}" + + [email protected]("requests.adapters.HTTPAdapter.send") +def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "localhost") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" in proxies.keys() + assert "no" in proxies.keys() + + [email protected]("requests.adapters.HTTPAdapter.send") +def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch): + monkeypatch.setenv("http_proxy", "http://proxy.example.com") + monkeypatch.setenv("no_proxy", "10.0.0.0/8") + + response = Response() + response.status_code = HTTPStatus.OK + mock_send.return_value = response + + _fetch_logs_from_service(log_url, log_location) + + mock_send.assert_called() + _, kwargs = mock_send.call_args + assert "proxies" in kwargs + proxies = kwargs["proxies"] + assert "http" not in proxies.keys() + assert "no" not in proxies.keys()
