This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd96be5468 Remove _read method from hdfs task handler after bumping
min airflow version to 2.6 (#36425)
dd96be5468 is described below
commit dd96be5468241eefeab4145e352764dee20d0f33
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Dec 26 09:57:38 2023 +0100
Remove _read method from hdfs task handler after bumping min airflow
version to 2.6 (#36425)
---
.../providers/apache/hdfs/log/hdfs_task_handler.py | 28 ----------------------
1 file changed, 28 deletions(-)
diff --git a/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
b/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
index b685653dd6..2a62aeef13 100644
--- a/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
+++ b/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
@@ -103,31 +103,3 @@ class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
messages.append(f"No logs found on hdfs for ti={ti}")
return messages, logs
-
- def _read(self, ti, try_number, metadata=None):
- """
- Read logs of given task instance and try_number from HDFS.
-
- If failed, read the log from task instance host machine.
-
- todo: when min airflow version >= 2.6 then remove this method
(``_read``)
-
- :param ti: task instance object
- :param try_number: task instance try_number to read logs from
- :param metadata: log metadata,
- can be used for steaming log reading and auto-tailing.
- """
- # from airflow 2.6 we no longer implement the _read method
- if hasattr(super(), "_read_remote_logs"):
- return super()._read(ti, try_number, metadata)
- # if we get here, we're on airflow < 2.6 and we use this backcompat
logic
- messages, logs = self._read_remote_logs(ti, try_number, metadata)
- if logs:
- return "".join(f"*** {x}\n" for x in messages) + "\n".join(logs),
{"end_of_log": True}
- else:
- if metadata and metadata.get("log_pos", 0) > 0:
- log_prefix = ""
- else:
- log_prefix = "*** Falling back to local log\n"
- local_log, metadata = super()._read(ti, try_number, metadata)
- return f"{log_prefix}{local_log}", metadata