Repository: incubator-airflow Updated Branches: refs/heads/master 32750601a -> 4c674ccff
[AIRFLOW-1564] Use Jinja2 to render logging filename Still backwards compatible with python format Closes #2565 from NielsZeilemaker/AIRFLOW-1564 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c674ccf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c674ccf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c674ccf Branch: refs/heads/master Commit: 4c674ccffda1fbc38b8cc044b0e2c004422a2035 Parents: 3275060 Author: Niels Zeilemaker <[email protected]> Authored: Wed Sep 6 13:41:20 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Sep 6 13:41:20 2017 +0200 ---------------------------------------------------------------------- .../config_templates/default_airflow_logging.py | 2 +- airflow/utils/log/file_task_handler.py | 25 +++++++++++++++----- airflow/utils/log/gcs_task_handler.py | 8 ++----- airflow/utils/log/s3_task_handler.py | 8 ++----- tests/utils/test_log_handlers.py | 23 ++++++++++++++++++ 5 files changed, 47 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/config_templates/default_airflow_logging.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py index d6ae036..523b4e8 100644 --- a/airflow/config_templates/default_airflow_logging.py +++ b/airflow/config_templates/default_airflow_logging.py @@ -37,7 +37,7 @@ if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'): elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'): GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER -FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log' +FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' DEFAULT_LOGGING_CONFIG = { 'version': 1, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/file_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index bce974c..7392aae 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -15,6 +15,8 @@ import logging import os +from jinja2 import Template + from airflow import configuration as conf from airflow.configuration import AirflowConfigException from airflow.utils.file import mkdirs @@ -37,6 +39,10 @@ class FileTaskHandler(logging.Handler): self.handler = None self.local_base = base_log_folder self.filename_template = filename_template + self.filename_jinja_template = None + + if "{{" in self.filename_template: #jinja mode + self.filename_jinja_template = Template(self.filename_template) def set_context(self, ti): """ @@ -59,6 +65,17 @@ class FileTaskHandler(logging.Handler): def close(self): if self.handler is not None: self.handler.close() + + def _render_filename(self, ti, try_number): + if self.filename_jinja_template: + jinja_context = ti.get_template_context() + jinja_context['try_number'] = try_number + return self.filename_jinja_template.render(**jinja_context) + + return self.filename_template.format(dag_id=ti.dag_id, + task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), + try_number=try_number) def _read(self, ti, try_number): """ @@ -71,9 +88,7 @@ class FileTaskHandler(logging.Handler): # Task instance here might be different from task instance when # initializing the handler. Thus explicitly getting log location # is needed to get correct log path. - log_relative_path = self.filename_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + log_relative_path = self._render_filename(ti, try_number + 1) loc = os.path.join(self.local_base, log_relative_path) log = "" @@ -153,9 +168,7 @@ class FileTaskHandler(logging.Handler): # writable by both users, then it's possible that re-running a task # via the UI (or vice versa) results in a permission error as the task # tries to write to a log file created by the other user. - relative_path = self.filename_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + relative_path = self._render_filename(ti, ti.try_number + 1) full_path = os.path.join(self.local_base, relative_path) directory = os.path.dirname(full_path) # Create the log file and give it group writable permissions http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 5b35907..c340f10 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -39,9 +39,7 @@ class GCSTaskHandler(FileTaskHandler): # Log relative path is used to construct local and remote # log path to upload log files into GCS and read from the # remote location. - self.log_relative_path = self.filename_template( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + self.log_relative_path = self._render_filename(ti, ti.try_number + 1) def close(self): """ @@ -76,9 +74,7 @@ class GCSTaskHandler(FileTaskHandler): # Explicitly getting log relative path is necessary as the given # task instance might be different than task instance passed in # in set_context method. - log_relative_path = self.filename_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + log_relative_path = self._render_filename(ti, try_number + 1) remote_loc = os.path.join(self.remote_base, log_relative_path) gcs_log = logging_utils.GCSLog() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 7268d22..51baaac 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -35,9 +35,7 @@ class S3TaskHandler(FileTaskHandler): super(S3TaskHandler, self).set_context(ti) # Local location and remote location is needed to open and # upload local log file to S3 remote storage. - self.log_relative_path = self.filename_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + self.log_relative_path = self._render_filename(ti, ti.try_number + 1) def close(self): """ @@ -72,9 +70,7 @@ class S3TaskHandler(FileTaskHandler): # Explicitly getting log relative path is necessary as the given # task instance might be different than task instance passed in # in set_context method. - log_relative_path = self.filename_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + log_relative_path = self._render_filename(ti, try_number + 1) remote_loc = os.path.join(self.remote_base, log_relative_path) s3_log = logging_utils.S3Log() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/tests/utils/test_log_handlers.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 5b0d8a6..8337c5d 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -23,6 +23,7 @@ from datetime import datetime from airflow.models import TaskInstance, DAG from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.log.file_task_handler import FileTaskHandler DEFAULT_DATE = datetime(2016, 1, 1) TASK_LOGGER = 'airflow.task' @@ -71,3 +72,25 @@ class TestFileTaskLogHandler(unittest.TestCase): # Remove the generated tmp log file. os.remove(log_filename) + + +class TestFilenameRendering(unittest.TestCase): + + def setUp(self): + dag = DAG('dag_for_testing_filename_rendering', start_date=DEFAULT_DATE) + task = DummyOperator(task_id='task_for_testing_filename_rendering', dag=dag) + self.ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + + def test_python_formatting(self): + expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat() + + fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log') + rendered_filename = fth._render_filename(self.ti, 42) + self.assertEqual(expected_filename, rendered_filename) + + def test_jinja_rendering(self): + expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat() + + fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log') + rendered_filename = fth._render_filename(self.ti, 42) + self.assertEqual(expected_filename, rendered_filename) \ No newline at end of file
