Repository: incubator-airflow Updated Branches: refs/heads/master 4fb7a90b3 -> 4ee4e474b
[AIRFLOW-1018] Make processor use logging framework Until now, the dga processor had its own logging implementation, making it hard to adjust for certain use cases like working in a container. This patch moves everything to the standard logging framework. Closes #2728 from bolkedebruin/AIRFLOW-1018 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4ee4e474 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4ee4e474 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4ee4e474 Branch: refs/heads/master Commit: 4ee4e474b835b4f5f557226ba01b8cdfeb7d0789 Parents: 4fb7a90 Author: Bolke de Bruin <[email protected]> Authored: Mon Oct 30 20:40:12 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Mon Oct 30 20:40:12 2017 +0100 ---------------------------------------------------------------------- .../config_templates/airflow_local_settings.py | 16 +++ airflow/jobs.py | 79 ++++------- airflow/utils/dag_processing.py | 109 +-------------- airflow/utils/log/file_processor_handler.py | 133 +++++++++++++++++++ tests/core.py | 16 --- tests/test_logging_config.py | 8 +- tests/utils/log/__init__.py | 13 ++ tests/utils/log/test_file_processor_handler.py | 90 +++++++++++++ tests/utils/log/test_logging.py | 113 ---------------- tests/utils/log/test_s3_task_handler.py | 114 ++++++++++++++++ 10 files changed, 405 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/config_templates/airflow_local_settings.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index fe3337f..28c263e 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -25,8 +25,10 @@ LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() LOG_FORMAT = conf.get('core', 'log_format') BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') +PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory') FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' +PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log' DEFAULT_LOGGING_CONFIG = { 'version': 1, @@ -35,6 +37,9 @@ DEFAULT_LOGGING_CONFIG = { 'airflow.task': { 'format': LOG_FORMAT, }, + 'airflow.processor': { + 'format': LOG_FORMAT, + }, }, 'handlers': { 'console': { @@ -47,6 +52,12 @@ DEFAULT_LOGGING_CONFIG = { 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, + }, + 'file.processor': { + 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', + 'formatter': 'airflow.processor', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, } # When using s3 or gcs, provide a customized LOGGING_CONFIG # in airflow_local_settings within your PYTHONPATH, see UPDATING.md @@ -67,6 +78,11 @@ DEFAULT_LOGGING_CONFIG = { # }, }, 'loggers': { + 'airflow.processor' : { + 'handlers': ['file.processor'], + 'level': LOG_LEVEL, + 'propagate': False, + }, 'airflow.task': { 'handlers': ['file.task'], 'level': LOG_LEVEL, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index cc9af90..7a7e564 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -18,6 +18,7 @@ from __future__ import print_function from __future__ import unicode_literals import getpass +import logging import multiprocessing import os import psutil @@ -285,7 +286,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): # Counter that increments everytime an instance of this class is created class_creation_counter = 0 - def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file): + def __init__(self, file_path, pickle_dags, dag_id_white_list): """ :param file_path: a Python file containing Airflow DAG definitions :type file_path: unicode @@ -293,11 +294,8 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :type pickle_dags: bool :param dag_id_whitelist: If specified, only look at these DAG ID's :type dag_id_whitelist: list[unicode] - :param log_file: the path to the file where log lines should be output - :type log_file: unicode """ self._file_path = file_path - self._log_file = log_file # Queue that's used to pass results from the child process. self._result_queue = multiprocessing.Queue() # The process that was launched to process the given . @@ -319,17 +317,12 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): def file_path(self): return self._file_path - @property - def log_file(self): - return self._log_file - @staticmethod def _launch_process(result_queue, file_path, pickle_dags, dag_id_white_list, - thread_name, - log_file): + thread_name): """ Launch a process to process the given file. @@ -345,35 +338,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :type dag_id_white_list: list[unicode] :param thread_name: the name to use for the process that is launched :type thread_name: unicode - :param log_file: the logging output for the process should be directed - to this file - :type log_file: unicode :return: the process that was launched :rtype: multiprocessing.Process """ def helper(): # This helper runs in the newly created process - - # Re-direct stdout and stderr to a separate log file. Otherwise, - # the main log becomes too hard to read. No buffering to enable - # responsive file tailing - parent_dir, _ = os.path.split(log_file) - - _log = LoggingMixin().log - - # Create the parent directory for the log file if necessary. - if not os.path.isdir(parent_dir): - os.makedirs(parent_dir) - - f = open(log_file, "a") - original_stdout = sys.stdout - original_stderr = sys.stderr - - sys.stdout = f - sys.stderr = f + log = logging.getLogger("airflow.processor") + for handler in log.handlers: + try: + handler.set_context(file_path) + except AttributeError: + # Not all handlers need to have context passed in so we ignore + # the error when handlers do not have set_context defined. + pass try: - configure_logging() # Re-configure the ORM engine as there are issues with multiple processes settings.configure_orm() @@ -383,26 +362,20 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): threading.current_thread().name = thread_name start_time = time.time() - _log.info("Started process (PID=%s) to work on %s", - os.getpid(), - file_path) - scheduler_job = SchedulerJob(dag_ids=dag_id_white_list) + log.info("Started process (PID=%s) to work on %s", + os.getpid(), file_path) + scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log) result = scheduler_job.process_file(file_path, pickle_dags) result_queue.put(result) end_time = time.time() - _log.info( - "Processing %s took %.3f seconds", - file_path, end_time - start_time + log.info( + "Processing %s took %.3f seconds", file_path, end_time - start_time ) except: # Log exceptions through the logging framework. - _log.exception("Got an exception! Propagating...") + log.exception("Got an exception! Propagating...") raise - finally: - sys.stdout = original_stdout - sys.stderr = original_stderr - f.close() p = multiprocessing.Process(target=helper, args=(), @@ -419,8 +392,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): self.file_path, self._pickle_dags, self._dag_id_white_list, - "DagFileProcessor{}".format(self._instance_id), - self.log_file) + "DagFileProcessor{}".format(self._instance_id)) self._start_time = datetime.utcnow() def terminate(self, sigkill=False): @@ -538,6 +510,7 @@ class SchedulerJob(BaseJob): processor_poll_interval=1.0, run_duration=None, do_pickle=False, + log=None, *args, **kwargs): """ :param dag_id: if specified, only schedule tasks with this DAG ID @@ -574,6 +547,10 @@ class SchedulerJob(BaseJob): self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') self.max_threads = conf.getint('scheduler', 'max_threads') + + if log: + self._log = log + self.using_sqlite = False if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): if self.max_threads > 1: @@ -591,9 +568,7 @@ class SchedulerJob(BaseJob): # Parse and schedule each file no faster than this interval. Default # to 3 minutes. self.file_process_interval = file_process_interval - # Directory where log files for the processes that scheduled the DAGs reside - self.child_process_log_directory = conf.get('scheduler', - 'child_process_log_directory') + self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') if run_duration is None: self.run_duration = conf.getint('scheduler', @@ -1548,17 +1523,15 @@ class SchedulerJob(BaseJob): known_file_paths = list_py_file_paths(self.subdir) self.log.info("There are %s files in %s", len(known_file_paths), self.subdir) - def processor_factory(file_path, log_file_path): + def processor_factory(file_path): return DagFileProcessor(file_path, pickle_dags, - self.dag_ids, - log_file_path) + self.dag_ids) processor_manager = DagFileProcessorManager(self.subdir, known_file_paths, self.max_threads, self.file_process_interval, - self.child_process_log_directory, self.num_runs, processor_factory) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index ebb5ca0..68cee76 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -278,15 +278,6 @@ class AbstractDagFileProcessor(object): @property @abstractmethod - def log_file(self): - """ - :return: the log file associated with this processor - :rtype: unicode - """ - raise NotImplementedError() - - @property - @abstractmethod def file_path(self): """ :return: the path to the file that this is processing @@ -313,7 +304,6 @@ class DagFileProcessorManager(LoggingMixin): file_paths, parallelism, process_file_interval, - child_process_log_directory, max_runs, processor_factory): """ @@ -330,12 +320,9 @@ class DagFileProcessorManager(LoggingMixin): :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :type max_runs: int - :param child_process_log_directory: Store logs for child processes in - this directory - :type child_process_log_directory: unicode :type process_file_interval: float :param processor_factory: function that creates processors for DAG - definition files. Arguments are (dag_definition_path, log_file_path) + definition files. Arguments are (dag_definition_path) :type processor_factory: (unicode, unicode) -> (AbstractDagFileProcessor) """ @@ -345,7 +332,6 @@ class DagFileProcessorManager(LoggingMixin): self._dag_directory = dag_directory self._max_runs = max_runs self._process_file_interval = process_file_interval - self._child_process_log_directory = child_process_log_directory self._processor_factory = processor_factory # Map from file path to the processor self._processors = {} @@ -447,85 +433,6 @@ class DagFileProcessorManager(LoggingMixin): processor.stop() self._processors = filtered_processors - @staticmethod - def _split_path(file_path): - """ - Return the path elements of a path as an array. E.g. /a/b/c -> - ['a', 'b', 'c'] - - :param file_path: the file path to split - :return: a list of the elements of the file path - :rtype: list[unicode] - """ - results = [] - while True: - head, tail = os.path.split(file_path) - if len(tail) != 0: - results.append(tail) - if file_path == head: - break - file_path = head - results.reverse() - return results - - def _get_log_directory(self): - """ - Log output from processing DAGs for the current day should go into - this directory. - - :return: the path to the corresponding log directory - :rtype: unicode - """ - now = datetime.utcnow() - return os.path.join(self._child_process_log_directory, - now.strftime("%Y-%m-%d")) - - def _get_log_file_path(self, dag_file_path): - """ - Log output from processing the specified file should go to this - location. - - :param dag_file_path: file containing a DAG - :type dag_file_path: unicode - :return: the path to the corresponding log file - :rtype: unicode - """ - log_directory = self._get_log_directory() - # General approach is to put the log file under the same relative path - # under the log directory as the DAG file in the DAG directory - relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory) - path_elements = self._split_path(relative_dag_file_path) - - # Add a .log suffix for the log file - path_elements[-1] += ".log" - - return os.path.join(log_directory, *path_elements) - - def symlink_latest_log_directory(self): - """ - Create symbolic link to the current day's log directory to - allow easy access to the latest scheduler log files. - - :return: None - """ - log_directory = self._get_log_directory() - latest_log_directory_path = os.path.join( - self._child_process_log_directory, "latest") - if os.path.isdir(log_directory): - # if symlink exists but is stale, update it - if os.path.islink(latest_log_directory_path): - if os.readlink(latest_log_directory_path) != log_directory: - os.unlink(latest_log_directory_path) - os.symlink(log_directory, latest_log_directory_path) - elif (os.path.isdir(latest_log_directory_path) or - os.path.isfile(latest_log_directory_path)): - self.log.warning( - "%s already exists as a dir/file. Skip creating symlink.", - latest_log_directory_path - ) - else: - os.symlink(log_directory, latest_log_directory_path) - def processing_count(self): """ :return: the number of files currently being processed @@ -574,8 +481,8 @@ class DagFileProcessorManager(LoggingMixin): for file_path, processor in finished_processors.items(): if processor.result is None: self.log.warning( - "Processor for %s exited with return code %s. See %s for details.", - processor.file_path, processor.exit_code, processor.log_file + "Processor for %s exited with return code %s.", + processor.file_path, processor.exit_code ) else: for simple_dag in processor.result: @@ -622,19 +529,15 @@ class DagFileProcessorManager(LoggingMixin): while (self._parallelism - len(self._processors) > 0 and len(self._file_path_queue) > 0): file_path = self._file_path_queue.pop(0) - log_file_path = self._get_log_file_path(file_path) - processor = self._processor_factory(file_path, log_file_path) + processor = self._processor_factory(file_path) processor.start() self.log.info( - "Started a process (PID: %s) to generate tasks for %s - logging into %s", - processor.pid, file_path, log_file_path + "Started a process (PID: %s) to generate tasks for %s", + processor.pid, file_path ) - self._processors[file_path] = processor - self.symlink_latest_log_directory() - # Update scheduler heartbeat count. self._run_count[self._heart_beat_key] += 1 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/utils/log/file_processor_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py new file mode 100644 index 0000000..cd03a2a --- /dev/null +++ b/airflow/utils/log/file_processor_handler.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +from jinja2 import Template + +from airflow import configuration as conf +from datetime import datetime + + +class FileProcessorHandler(logging.Handler): + """ + FileProcessorHandler is a python log handler that handles + dag processor logs. It creates and delegates log handling + to `logging.FileHandler` after receiving dag processor context. + """ + + def __init__(self, base_log_folder, filename_template): + """ + :param base_log_folder: Base log folder to place logs. + :param filename_template: template filename string + """ + super(FileProcessorHandler, self).__init__() + self.handler = None + self.base_log_folder = base_log_folder + self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) + self.filename_template = filename_template + self.filename_jinja_template = None + + if "{{" in self.filename_template: #jinja mode + self.filename_jinja_template = Template(self.filename_template) + + self._cur_date = datetime.today() + if not os.path.exists(self._get_log_directory()): + os.makedirs(self._get_log_directory()) + + self._symlink_latest_log_directory() + + def set_context(self, filename): + """ + Provide filename context to airflow task handler. + :param filename: filename in which the dag is located + """ + local_loc = self._init_file(filename) + self.handler = logging.FileHandler(local_loc) + self.handler.setFormatter(self.formatter) + self.handler.setLevel(self.level) + + if self._cur_date < datetime.today(): + self._symlink_latest_log_directory() + self._cur_date = datetime.today() + + def emit(self, record): + if self.handler is not None: + self.handler.emit(record) + + def flush(self): + if self.handler is not None: + self.handler.flush() + + def close(self): + if self.handler is not None: + self.handler.close() + + def _render_filename(self, filename): + filename = os.path.relpath(filename, self.dag_dir) + ctx = dict() + ctx['filename'] = filename + + if self.filename_jinja_template: + return self.filename_jinja_template.render(**ctx) + + return self.filename_template.format(filename=ctx['filename']) + + def _get_log_directory(self): + now = datetime.utcnow() + + return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d")) + + def _symlink_latest_log_directory(self): + """ + Create symbolic link to the current day's log directory to + allow easy access to the latest scheduler log files. + + :return: None + """ + log_directory = self._get_log_directory() + latest_log_directory_path = os.path.join(self.base_log_folder, "latest") + if os.path.isdir(log_directory): + # if symlink exists but is stale, update it + if os.path.islink(latest_log_directory_path): + if os.readlink(latest_log_directory_path) != log_directory: + os.unlink(latest_log_directory_path) + os.symlink(log_directory, latest_log_directory_path) + elif (os.path.isdir(latest_log_directory_path) or + os.path.isfile(latest_log_directory_path)): + self.log.warning( + "%s already exists as a dir/file. Skip creating symlink.", + latest_log_directory_path + ) + else: + os.symlink(log_directory, latest_log_directory_path) + + def _init_file(self, filename): + """ + Create log file and directory if required. + :param filename: task instance object + :return relative log path of the given task instance + """ + relative_path = self._render_filename(filename) + full_path = os.path.join(self._get_log_directory(), relative_path) + directory = os.path.dirname(full_path) + + if not os.path.exists(directory): + os.makedirs(directory) + + if not os.path.exists(full_path): + open(full_path, "a").close() + + return full_path http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 513ed45..f49dc2a 100644 --- a/tests/core.py +++ b/tests/core.py @@ -666,22 +666,6 @@ class CoreTest(unittest.TestCase): job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True) job.run() - @mock.patch('airflow.utils.dag_processing.datetime', FakeDatetime) - def test_scheduler_job(self): - FakeDatetime.utcnow = classmethod(lambda cls: datetime(2016, 1, 1)) - job = jobs.SchedulerJob(dag_id='example_bash_operator', - **self.default_scheduler_args) - job.run() - log_base_directory = configuration.conf.get("scheduler", - "child_process_log_directory") - latest_log_directory_path = os.path.join(log_base_directory, "latest") - # verify that the symlink to the latest logs exists - self.assertTrue(os.path.islink(latest_log_directory_path)) - - # verify that the symlink points to the correct log directory - log_directory = os.path.join(log_base_directory, "2016-01-01") - self.assertEqual(os.readlink(latest_log_directory_path), log_directory) - def test_raw_job(self): TI = models.TaskInstance ti = TI( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/test_logging_config.py ---------------------------------------------------------------------- diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py index 9407221..516f666 100644 --- a/tests/test_logging_config.py +++ b/tests/test_logging_config.py @@ -196,7 +196,13 @@ class TestLoggingSettings(unittest.TestCase): def test_when_the_config_key_does_not_exists(self): from airflow import logging_config - logging_config.conf.get = mock.Mock(side_effect=AirflowConfigException('boom')) + def side_effect(*args): + if args[1] == 'logging_config_class': + raise AirflowConfigException + else: + return "bla_bla_from_test" + + logging_config.conf.get = mock.Mock(side_effect=side_effect) with patch.object(logging_config.log, 'debug') as mock_debug: logging_config.configure_logging() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/__init__.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/__init__.py b/tests/utils/log/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/utils/log/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_file_processor_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py new file mode 100644 index 0000000..defe623 --- /dev/null +++ b/tests/utils/log/test_file_processor_handler.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import shutil +import os +import unittest + +from airflow.utils.log.file_processor_handler import FileProcessorHandler +from datetime import datetime +from datetime import timedelta +from freezegun import freeze_time + + +class TestFileProcessorHandler(unittest.TestCase): + def setUp(self): + super(TestFileProcessorHandler, self).setUp() + self.base_log_folder = "/tmp/log_test" + self.filename = "{filename}" + self.filename_template = "{{ filename }}.log" + self.dag_dir = "/dags" + + def test_non_template(self): + date = datetime.utcnow().strftime("%Y-%m-%d") + handler = FileProcessorHandler(base_log_folder=self.base_log_folder, + filename_template=self.filename) + handler.dag_dir = self.dag_dir + + path = os.path.join(self.base_log_folder, "latest") + self.assertTrue(os.path.islink(path)) + self.assertEqual(os.path.basename(os.readlink(path)), date) + + handler.set_context(filename=os.path.join(self.dag_dir, "logfile")) + self.assertTrue(os.path.exists(os.path.join(path, "logfile"))) + + def test_template(self): + date = datetime.utcnow().strftime("%Y-%m-%d") + handler = FileProcessorHandler(base_log_folder=self.base_log_folder, + filename_template=self.filename_template) + handler.dag_dir = self.dag_dir + + path = os.path.join(self.base_log_folder, "latest") + self.assertTrue(os.path.islink(path)) + self.assertEqual(os.path.basename(os.readlink(path)), date) + + handler.set_context(filename=os.path.join(self.dag_dir, "logfile")) + self.assertTrue(os.path.exists(os.path.join(path, "logfile.log"))) + + def test_symlink_latest_log_directory(self): + handler = FileProcessorHandler(base_log_folder=self.base_log_folder, + filename_template=self.filename) + handler.dag_dir = self.dag_dir + + date1 = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d") + date2 = (datetime.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d") + + p1 = os.path.join(self.base_log_folder, date1, "log1") + p2 = os.path.join(self.base_log_folder, date1, "log2") + + if os.path.exists(p1): + os.remove(p1) + if os.path.exists(p2): + os.remove(p2) + + link = os.path.join(self.base_log_folder, "latest") + + with freeze_time(date1): + handler.set_context(filename=os.path.join(self.dag_dir, "log1")) + self.assertTrue(os.path.islink(link)) + self.assertEqual(os.path.basename(os.readlink(link)), date1) + self.assertTrue(os.path.exists(os.path.join(link, "log1"))) + + with freeze_time(date2): + handler.set_context(filename=os.path.join(self.dag_dir, "log2")) + self.assertTrue(os.path.islink(link)) + self.assertEqual(os.path.basename(os.readlink(link)), date2) + self.assertTrue(os.path.exists(os.path.join(link, "log2"))) + + def tearDown(self): + shutil.rmtree(self.base_log_folder, ignore_errors=True) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_logging.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_logging.py b/tests/utils/log/test_logging.py deleted file mode 100644 index 57f869f..0000000 --- a/tests/utils/log/test_logging.py +++ /dev/null @@ -1,113 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock -import unittest - -from airflow.utils.log.s3_task_handler import S3TaskHandler - - -class TestS3TaskHandler(unittest.TestCase): - - def setUp(self): - super(S3TaskHandler, self).setUp() - self.remote_log_location = 'remote/log/location' - self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook") - self.hook_mock = self.hook_patcher.start() - self.hook_inst_mock = self.hook_mock.return_value - self.hook_key_mock = self.hook_inst_mock.get_key.return_value - self.hook_key_mock.get_contents_as_string.return_value.decode.\ - return_value = 'content' - - def tearDown(self): - self.hook_patcher.stop() - super(S3TaskHandler, self).tearDown() - - def test_init(self): - S3TaskHandler() - self.hook_mock.assert_called_once_with('') - - def test_init_raises(self): - self.hook_mock.side_effect = Exception('Failed to connect') - handler = S3TaskHandler() - with mock.patch.object(handler.log, 'error') as mock_error: - # Initialize the hook - handler.hook() - mock_error.assert_called_once_with( - 'Could not create an S3Hook with connection id "". Please make ' - 'sure that airflow[s3] is installed and the S3 connection exists.' - ) - - def test_log_exists(self): - self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_log_exists_none(self): - self.hook_inst_mock.get_key.return_value = None - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_log_exists_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_log_exists_false(self): - self.hook_inst_mock.check_for_key.return_value = False - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_log_exists_no_hook(self): - self.hook_mock.side_effect = Exception('Failed to connect') - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_read(self): - self.assertEqual( - S3TaskHandler().read(self.remote_log_location), - 'content' - ) - - def test_read_key_empty(self): - self.hook_inst_mock.get_key.return_value = None - self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') - - def test_read_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') - - def test_read_raises_return_error(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - handler = S3TaskHandler() - with mock.patch.object(handler.log, 'error') as mock_error: - result = handler.s3_log_read( - self.remote_log_location, - return_error=True - ) - msg = 'Could not read logs from %s' % self.remote_log_location - self.assertEqual(result, msg) - mock_error.assert_called_once_with(msg) - - def test_write(self): - S3TaskHandler().write('text', self.remote_log_location) - self.hook_inst_mock.load_string.assert_called_once_with( - 'content\ntext', - key=self.remote_log_location, - replace=True, - encrypt=False, - ) - - def test_write_raises(self): - self.hook_inst_mock.read_key.return_value = '' - self.hook_inst_mock.load_string.side_effect = Exception('error') - handler = S3TaskHandler() - with mock.patch.object(handler.log, 'error') as mock_error: - handler.write('text', self.remote_log_location) - msg = 'Could not write logs to %s' % self.remote_log_location - mock_error.assert_called_once_with(msg) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py new file mode 100644 index 0000000..0438630 --- /dev/null +++ b/tests/utils/log/test_s3_task_handler.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import unittest + +from airflow.utils.log.s3_task_handler import S3TaskHandler + + [email protected]("Non functional S3 tests") +class TestS3TaskHandler(unittest.TestCase): + + def setUp(self): + super(TestS3TaskHandler, self).setUp() + self.remote_log_location = 'remote/log/location' + self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook") + self.hook_mock = self.hook_patcher.start() + self.hook_inst_mock = self.hook_mock.return_value + self.hook_key_mock = self.hook_inst_mock.get_key.return_value + self.hook_key_mock.get_contents_as_string.return_value.decode.\ + return_value = 'content' + + def tearDown(self): + self.hook_patcher.stop() + super(TestS3TaskHandler, self).tearDown() + + def test_init(self): + S3TaskHandler() + self.hook_mock.assert_called_once_with('') + + def test_init_raises(self): + self.hook_mock.side_effect = Exception('Failed to connect') + handler = S3TaskHandler() + with mock.patch.object(handler.log, 'error') as mock_error: + # Initialize the hook + handler.hook() + mock_error.assert_called_once_with( + 'Could not create an S3Hook with connection id "". Please make ' + 'sure that airflow[s3] is installed and the S3 connection exists.' + ) + + def test_log_exists(self): + self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location)) + + def test_log_exists_none(self): + self.hook_inst_mock.get_key.return_value = None + self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + + def test_log_exists_raises(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + + def test_log_exists_false(self): + self.hook_inst_mock.check_for_key.return_value = False + self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + + def test_log_exists_no_hook(self): + self.hook_mock.side_effect = Exception('Failed to connect') + self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + + def test_read(self): + self.assertEqual( + S3TaskHandler().read(self.remote_log_location), + 'content' + ) + + def test_read_key_empty(self): + self.hook_inst_mock.get_key.return_value = None + self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') + + def test_read_raises(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') + + def test_read_raises_return_error(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + handler = S3TaskHandler() + with mock.patch.object(handler.log, 'error') as mock_error: + result = handler.s3_log_read( + self.remote_log_location, + return_error=True + ) + msg = 'Could not read logs from %s' % self.remote_log_location + self.assertEqual(result, msg) + mock_error.assert_called_once_with(msg) + + def test_write(self): + S3TaskHandler().write('text', self.remote_log_location) + self.hook_inst_mock.load_string.assert_called_once_with( + 'content\ntext', + key=self.remote_log_location, + replace=True, + encrypt=False, + ) + + def test_write_raises(self): + self.hook_inst_mock.read_key.return_value = '' + self.hook_inst_mock.load_string.side_effect = Exception('error') + handler = S3TaskHandler() + with mock.patch.object(handler.log, 'error') as mock_error: + handler.write('text', self.remote_log_location) + msg = 'Could not write logs to %s' % self.remote_log_location + mock_error.assert_called_once_with(msg)
