This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 72a6264e20 Clean WASB task handler code after bumping min Airflow
version to 2.6.0 (#36421)
72a6264e20 is described below
commit 72a6264e20a4ed8a0704ba16fe59b228d88f3edb
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Dec 26 03:36:14 2023 +0100
Clean WASB task handler code after bumping min Airflow version to 2.6.0
(#36421)
---
.../providers/microsoft/azure/log/wasb_task_handler.py | 17 ++---------------
.../microsoft/azure/log/test_wasb_task_handler.py | 9 +++------
2 files changed, 5 insertions(+), 21 deletions(-)
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index c57de1acb1..a174f29cec 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -24,7 +24,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
from azure.core.exceptions import HttpResponseError
-from packaging.version import Version
from airflow.configuration import conf
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -36,18 +35,6 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
-def get_default_delete_local_copy():
- """Load delete_local_logs conf if Airflow version > 2.6 and return False
if not.
-
- TODO: delete this function when min airflow version >= 2.6
- """
- from airflow.version import version
-
- if Version(version) < Version("2.6"):
- return False
- return conf.getboolean("logging", "delete_local_logs")
-
-
class WasbTaskHandler(FileTaskHandler, LoggingMixin):
"""
WasbTaskHandler is a python log handler that handles and reads task
instance logs.
@@ -73,8 +60,8 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
self.log_relative_path = ""
self.closed = False
self.upload_on_close = True
- self.delete_local_copy = (
- kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else
get_default_delete_local_copy()
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
)
@cached_property
diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
index f86c5f71f1..e74efe89e9 100644
--- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
+++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
@@ -179,8 +179,8 @@ class TestWasbTaskHandler:
)
@pytest.mark.parametrize(
- "delete_local_copy, expected_existence_of_local_copy, airflow_version",
- [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True,
"2.5.0"), (False, True, "2.5.0")],
+ "delete_local_copy, expected_existence_of_local_copy",
+ [(True, False), (False, True)],
)
@mock.patch("airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler.wasb_write")
def test_close_with_delete_local_logs_conf(
@@ -190,11 +190,8 @@ class TestWasbTaskHandler:
tmp_path_factory,
delete_local_copy,
expected_existence_of_local_copy,
- airflow_version,
):
- with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}), mock.patch(
- "airflow.version.version", airflow_version
- ):
+ with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}):
handler = WasbTaskHandler(
base_log_folder=str(tmp_path_factory.mktemp("local-s3-log-location")),
wasb_log_folder=self.wasb_log_folder,