This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f27357fa415ff23021073a3a8c218fe9b99a143b Author: Tzu-ping Chung <[email protected]> AuthorDate: Sat Feb 12 11:40:29 2022 +0800 Use compat data interval shim in log handlers (#21289) (cherry picked from commit 44bd211b19dcb75eeb53ced5bea2cf0c80654b1a) --- .../providers/elasticsearch/log/es_task_handler.py | 27 ++++++++++++----- airflow/utils/log/file_task_handler.py | 35 +++++++++++++++++----- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index cd08971..b591aef 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -101,15 +101,25 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix self.context_set = False def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: - dag_run = ti.dag_run + dag_run = ti.get_dagrun() + try: + data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run) + except AttributeError: # ti.task is not always set. + data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) if self.json_format: - data_interval_start = self._clean_date(dag_run.data_interval_start) - data_interval_end = self._clean_date(dag_run.data_interval_end) + data_interval_start = self._clean_date(data_interval[0]) + data_interval_end = self._clean_date(data_interval[1]) execution_date = self._clean_date(dag_run.execution_date) else: - data_interval_start = dag_run.data_interval_start.isoformat() - data_interval_end = dag_run.data_interval_end.isoformat() + if data_interval[0]: + data_interval_start = data_interval[0].isoformat() + else: + data_interval_start = "" + if data_interval[1]: + data_interval_end = data_interval[1].isoformat() + else: + data_interval_end = "" execution_date = dag_run.execution_date.isoformat() return self.log_id_template.format( @@ -123,14 +133,15 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix ) @staticmethod - def _clean_date(value: datetime) -> str: + def _clean_date(value: Optional[datetime]) -> str: """ Clean up a date value so that it is safe to query in elasticsearch by removing reserved characters. - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters - :param execution_date: execution date of the dag run. + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters """ + if value is None: + return "" return value.strftime("%Y_%m_%dT%H_%M_%S_%f") def _group_logs_by_host(self, logs): diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6e57c67..e13b8d4 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -18,8 +18,9 @@ """File logging handler for tasks.""" import logging import os +from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Tuple import httpx from itsdangerous import TimedJSONWebSignatureSerializer @@ -82,13 +83,31 @@ class FileTaskHandler(logging.Handler): context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat()) context["try_number"] = try_number return render_template_to_string(self.filename_jinja_template, context) - - return self.filename_template.format( - dag_id=ti.dag_id, - task_id=ti.task_id, - execution_date=ti.get_dagrun().logical_date.isoformat(), - try_number=try_number, - ) + elif self.filename_template: + dag_run = ti.get_dagrun() + try: + data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run) + except AttributeError: # ti.task is not always set. + data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) + if data_interval[0]: + data_interval_start = data_interval[0].isoformat() + else: + data_interval_start = "" + if data_interval[1]: + data_interval_end = data_interval[1].isoformat() + else: + data_interval_end = "" + return self.filename_template.format( + dag_id=ti.dag_id, + task_id=ti.task_id, + run_id=ti.run_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + execution_date=ti.get_dagrun().logical_date.isoformat(), + try_number=try_number, + ) + else: + raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") def _read_grouped_logs(self): return False
