This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b99e4ecb1c Clean OSS task handler code after bumping min Airflow
version to 2.6.0 (#36420)
b99e4ecb1c is described below
commit b99e4ecb1cfa4dbe719b4acf459b34237eb8cfda
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Dec 26 03:36:46 2023 +0100
Clean OSS task handler code after bumping min Airflow version to 2.6.0
(#36420)
---
.../providers/alibaba/cloud/log/oss_task_handler.py | 18 ++----------------
.../alibaba/cloud/log/test_oss_task_handler.py | 9 +++------
2 files changed, 5 insertions(+), 22 deletions(-)
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index e244c54e42..fc93fe87a5 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -23,26 +23,12 @@ import pathlib
import shutil
from functools import cached_property
-from packaging.version import Version
-
from airflow.configuration import conf
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
-def get_default_delete_local_copy():
- """Load delete_local_logs conf if Airflow version > 2.6 and return False
if not.
-
- TODO: delete this function when min airflow version >= 2.6
- """
- from airflow.version import version
-
- if Version(version) < Version("2.6"):
- return False
- return conf.getboolean("logging", "delete_local_logs")
-
-
class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
OSSTaskHandler is a python log handler that handles and reads task
instance logs.
@@ -58,8 +44,8 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
self._hook = None
self.closed = False
self.upload_on_close = True
- self.delete_local_copy = (
- kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else
get_default_delete_local_copy()
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
)
@cached_property
diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
index 3459f7f4a9..73beb035cf 100644
--- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
+++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
@@ -155,8 +155,8 @@ class TestOSSTaskHandler:
)
@pytest.mark.parametrize(
- "delete_local_copy, expected_existence_of_local_copy, airflow_version",
- [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True,
"2.5.0"), (False, True, "2.5.0")],
+ "delete_local_copy, expected_existence_of_local_copy",
+ [(True, False), (False, True)],
)
@mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.hook"),
new_callable=PropertyMock)
def test_close_with_delete_local_copy_conf(
@@ -165,12 +165,9 @@ class TestOSSTaskHandler:
tmp_path_factory,
delete_local_copy,
expected_existence_of_local_copy,
- airflow_version,
):
local_log_path = str(tmp_path_factory.mktemp("local-oss-log-location"))
- with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}), mock.patch(
- "airflow.version.version", airflow_version
- ):
+ with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}):
handler = OSSTaskHandler(local_log_path, self.oss_log_folder)
handler.log.info("test")