Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4fb7a90b3 -> 4ee4e474b


[AIRFLOW-1018] Make processor use logging framework

Until now, the dga processor had its own logging
implementation,
making it hard to adjust for certain use cases
like working
in a container.

This patch moves everything to the standard
logging framework.

Closes #2728 from bolkedebruin/AIRFLOW-1018


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4ee4e474
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4ee4e474
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4ee4e474

Branch: refs/heads/master
Commit: 4ee4e474b835b4f5f557226ba01b8cdfeb7d0789
Parents: 4fb7a90
Author: Bolke de Bruin <[email protected]>
Authored: Mon Oct 30 20:40:12 2017 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Mon Oct 30 20:40:12 2017 +0100

----------------------------------------------------------------------
 .../config_templates/airflow_local_settings.py  |  16 +++
 airflow/jobs.py                                 |  79 ++++-------
 airflow/utils/dag_processing.py                 | 109 +--------------
 airflow/utils/log/file_processor_handler.py     | 133 +++++++++++++++++++
 tests/core.py                                   |  16 ---
 tests/test_logging_config.py                    |   8 +-
 tests/utils/log/__init__.py                     |  13 ++
 tests/utils/log/test_file_processor_handler.py  |  90 +++++++++++++
 tests/utils/log/test_logging.py                 | 113 ----------------
 tests/utils/log/test_s3_task_handler.py         | 114 ++++++++++++++++
 10 files changed, 405 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index fe3337f..28c263e 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -25,8 +25,10 @@ LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
 LOG_FORMAT = conf.get('core', 'log_format')
 
 BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
 
 FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number 
}}.log'
+PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 
 DEFAULT_LOGGING_CONFIG = {
     'version': 1,
@@ -35,6 +37,9 @@ DEFAULT_LOGGING_CONFIG = {
         'airflow.task': {
             'format': LOG_FORMAT,
         },
+        'airflow.processor': {
+            'format': LOG_FORMAT,
+        },
     },
     'handlers': {
         'console': {
@@ -47,6 +52,12 @@ DEFAULT_LOGGING_CONFIG = {
             'formatter': 'airflow.task',
             'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
             'filename_template': FILENAME_TEMPLATE,
+        },
+        'file.processor': {
+            'class': 
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
+            'formatter': 'airflow.processor',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
         }
         # When using s3 or gcs, provide a customized LOGGING_CONFIG
         # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
@@ -67,6 +78,11 @@ DEFAULT_LOGGING_CONFIG = {
         # },
     },
     'loggers': {
+        'airflow.processor' : {
+            'handlers': ['file.processor'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
         'airflow.task': {
             'handlers': ['file.task'],
             'level': LOG_LEVEL,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index cc9af90..7a7e564 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -18,6 +18,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import getpass
+import logging
 import multiprocessing
 import os
 import psutil
@@ -285,7 +286,7 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
     # Counter that increments everytime an instance of this class is created
     class_creation_counter = 0
 
-    def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file):
+    def __init__(self, file_path, pickle_dags, dag_id_white_list):
         """
         :param file_path: a Python file containing Airflow DAG definitions
         :type file_path: unicode
@@ -293,11 +294,8 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
         :type pickle_dags: bool
         :param dag_id_whitelist: If specified, only look at these DAG ID's
         :type dag_id_whitelist: list[unicode]
-        :param log_file: the path to the file where log lines should be output
-        :type log_file: unicode
         """
         self._file_path = file_path
-        self._log_file = log_file
         # Queue that's used to pass results from the child process.
         self._result_queue = multiprocessing.Queue()
         # The process that was launched to process the given .
@@ -319,17 +317,12 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
     def file_path(self):
         return self._file_path
 
-    @property
-    def log_file(self):
-        return self._log_file
-
     @staticmethod
     def _launch_process(result_queue,
                         file_path,
                         pickle_dags,
                         dag_id_white_list,
-                        thread_name,
-                        log_file):
+                        thread_name):
         """
         Launch a process to process the given file.
 
@@ -345,35 +338,21 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
         :type dag_id_white_list: list[unicode]
         :param thread_name: the name to use for the process that is launched
         :type thread_name: unicode
-        :param log_file: the logging output for the process should be directed
-        to this file
-        :type log_file: unicode
         :return: the process that was launched
         :rtype: multiprocessing.Process
         """
         def helper():
             # This helper runs in the newly created process
-
-            # Re-direct stdout and stderr to a separate log file. Otherwise,
-            # the main log becomes too hard to read. No buffering to enable
-            # responsive file tailing
-            parent_dir, _ = os.path.split(log_file)
-
-            _log = LoggingMixin().log
-
-            # Create the parent directory for the log file if necessary.
-            if not os.path.isdir(parent_dir):
-                os.makedirs(parent_dir)
-
-            f = open(log_file, "a")
-            original_stdout = sys.stdout
-            original_stderr = sys.stderr
-
-            sys.stdout = f
-            sys.stderr = f
+            log = logging.getLogger("airflow.processor")
+            for handler in log.handlers:
+                try:
+                    handler.set_context(file_path)
+                except AttributeError:
+                    # Not all handlers need to have context passed in so we 
ignore
+                    # the error when handlers do not have set_context defined.
+                    pass
 
             try:
-                configure_logging()
                 # Re-configure the ORM engine as there are issues with 
multiple processes
                 settings.configure_orm()
 
@@ -383,26 +362,20 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
                 threading.current_thread().name = thread_name
                 start_time = time.time()
 
-                _log.info("Started process (PID=%s) to work on %s",
-                             os.getpid(),
-                             file_path)
-                scheduler_job = SchedulerJob(dag_ids=dag_id_white_list)
+                log.info("Started process (PID=%s) to work on %s",
+                         os.getpid(), file_path)
+                scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, 
log=log)
                 result = scheduler_job.process_file(file_path,
                                                     pickle_dags)
                 result_queue.put(result)
                 end_time = time.time()
-                _log.info(
-                    "Processing %s took %.3f seconds",
-                    file_path, end_time - start_time
+                log.info(
+                    "Processing %s took %.3f seconds", file_path, end_time - 
start_time
                 )
             except:
                 # Log exceptions through the logging framework.
-                _log.exception("Got an exception! Propagating...")
+                log.exception("Got an exception! Propagating...")
                 raise
-            finally:
-                sys.stdout = original_stdout
-                sys.stderr = original_stderr
-                f.close()
 
         p = multiprocessing.Process(target=helper,
                                     args=(),
@@ -419,8 +392,7 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
             self.file_path,
             self._pickle_dags,
             self._dag_id_white_list,
-            "DagFileProcessor{}".format(self._instance_id),
-            self.log_file)
+            "DagFileProcessor{}".format(self._instance_id))
         self._start_time = datetime.utcnow()
 
     def terminate(self, sigkill=False):
@@ -538,6 +510,7 @@ class SchedulerJob(BaseJob):
             processor_poll_interval=1.0,
             run_duration=None,
             do_pickle=False,
+            log=None,
             *args, **kwargs):
         """
         :param dag_id: if specified, only schedule tasks with this DAG ID
@@ -574,6 +547,10 @@ class SchedulerJob(BaseJob):
 
         self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
         self.max_threads = conf.getint('scheduler', 'max_threads')
+
+        if log:
+            self._log = log
+
         self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
             if self.max_threads > 1:
@@ -591,9 +568,7 @@ class SchedulerJob(BaseJob):
         # Parse and schedule each file no faster than this interval. Default
         # to 3 minutes.
         self.file_process_interval = file_process_interval
-        # Directory where log files for the processes that scheduled the DAGs 
reside
-        self.child_process_log_directory = conf.get('scheduler',
-                                                    
'child_process_log_directory')
+
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         if run_duration is None:
             self.run_duration = conf.getint('scheduler',
@@ -1548,17 +1523,15 @@ class SchedulerJob(BaseJob):
         known_file_paths = list_py_file_paths(self.subdir)
         self.log.info("There are %s files in %s", len(known_file_paths), 
self.subdir)
 
-        def processor_factory(file_path, log_file_path):
+        def processor_factory(file_path):
             return DagFileProcessor(file_path,
                                     pickle_dags,
-                                    self.dag_ids,
-                                    log_file_path)
+                                    self.dag_ids)
 
         processor_manager = DagFileProcessorManager(self.subdir,
                                                     known_file_paths,
                                                     self.max_threads,
                                                     self.file_process_interval,
-                                                    
self.child_process_log_directory,
                                                     self.num_runs,
                                                     processor_factory)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index ebb5ca0..68cee76 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -278,15 +278,6 @@ class AbstractDagFileProcessor(object):
 
     @property
     @abstractmethod
-    def log_file(self):
-        """
-        :return: the log file associated with this processor
-        :rtype: unicode
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
     def file_path(self):
         """
         :return: the path to the file that this is processing
@@ -313,7 +304,6 @@ class DagFileProcessorManager(LoggingMixin):
                  file_paths,
                  parallelism,
                  process_file_interval,
-                 child_process_log_directory,
                  max_runs,
                  processor_factory):
         """
@@ -330,12 +320,9 @@ class DagFileProcessorManager(LoggingMixin):
         :param max_runs: The number of times to parse and schedule each file. 
-1
         for unlimited.
         :type max_runs: int
-        :param child_process_log_directory: Store logs for child processes in
-        this directory
-        :type child_process_log_directory: unicode
         :type process_file_interval: float
         :param processor_factory: function that creates processors for DAG
-        definition files. Arguments are (dag_definition_path, log_file_path)
+        definition files. Arguments are (dag_definition_path)
         :type processor_factory: (unicode, unicode) -> 
(AbstractDagFileProcessor)
 
         """
@@ -345,7 +332,6 @@ class DagFileProcessorManager(LoggingMixin):
         self._dag_directory = dag_directory
         self._max_runs = max_runs
         self._process_file_interval = process_file_interval
-        self._child_process_log_directory = child_process_log_directory
         self._processor_factory = processor_factory
         # Map from file path to the processor
         self._processors = {}
@@ -447,85 +433,6 @@ class DagFileProcessorManager(LoggingMixin):
                 processor.stop()
         self._processors = filtered_processors
 
-    @staticmethod
-    def _split_path(file_path):
-        """
-        Return the path elements of a path as an array. E.g. /a/b/c ->
-        ['a', 'b', 'c']
-
-        :param file_path: the file path to split
-        :return: a list of the elements of the file path
-        :rtype: list[unicode]
-        """
-        results = []
-        while True:
-            head, tail = os.path.split(file_path)
-            if len(tail) != 0:
-                results.append(tail)
-            if file_path == head:
-                break
-            file_path = head
-        results.reverse()
-        return results
-
-    def _get_log_directory(self):
-        """
-        Log output from processing DAGs for the current day should go into
-        this directory.
-
-        :return: the path to the corresponding log directory
-        :rtype: unicode
-        """
-        now = datetime.utcnow()
-        return os.path.join(self._child_process_log_directory,
-                            now.strftime("%Y-%m-%d"))
-
-    def _get_log_file_path(self, dag_file_path):
-        """
-        Log output from processing the specified file should go to this
-        location.
-
-        :param dag_file_path: file containing a DAG
-        :type dag_file_path: unicode
-        :return: the path to the corresponding log file
-        :rtype: unicode
-        """
-        log_directory = self._get_log_directory()
-        # General approach is to put the log file under the same relative path
-        # under the log directory as the DAG file in the DAG directory
-        relative_dag_file_path = os.path.relpath(dag_file_path, 
start=self._dag_directory)
-        path_elements = self._split_path(relative_dag_file_path)
-
-        # Add a .log suffix for the log file
-        path_elements[-1] += ".log"
-
-        return os.path.join(log_directory, *path_elements)
-
-    def symlink_latest_log_directory(self):
-        """
-        Create symbolic link to the current day's log directory to
-        allow easy access to the latest scheduler log files.
-
-        :return: None
-        """
-        log_directory = self._get_log_directory()
-        latest_log_directory_path = os.path.join(
-            self._child_process_log_directory, "latest")
-        if os.path.isdir(log_directory):
-            # if symlink exists but is stale, update it
-            if os.path.islink(latest_log_directory_path):
-                if os.readlink(latest_log_directory_path) != log_directory:
-                    os.unlink(latest_log_directory_path)
-                    os.symlink(log_directory, latest_log_directory_path)
-            elif (os.path.isdir(latest_log_directory_path) or
-                    os.path.isfile(latest_log_directory_path)):
-                self.log.warning(
-                    "%s already exists as a dir/file. Skip creating symlink.",
-                    latest_log_directory_path
-                )
-            else:
-                os.symlink(log_directory, latest_log_directory_path)
-
     def processing_count(self):
         """
         :return: the number of files currently being processed
@@ -574,8 +481,8 @@ class DagFileProcessorManager(LoggingMixin):
         for file_path, processor in finished_processors.items():
             if processor.result is None:
                 self.log.warning(
-                    "Processor for %s exited with return code %s. See %s for 
details.",
-                    processor.file_path, processor.exit_code, 
processor.log_file
+                    "Processor for %s exited with return code %s.",
+                    processor.file_path, processor.exit_code
                 )
             else:
                 for simple_dag in processor.result:
@@ -622,19 +529,15 @@ class DagFileProcessorManager(LoggingMixin):
         while (self._parallelism - len(self._processors) > 0 and
                len(self._file_path_queue) > 0):
             file_path = self._file_path_queue.pop(0)
-            log_file_path = self._get_log_file_path(file_path)
-            processor = self._processor_factory(file_path, log_file_path)
+            processor = self._processor_factory(file_path)
 
             processor.start()
             self.log.info(
-                "Started a process (PID: %s) to generate tasks for %s - 
logging into %s",
-                processor.pid, file_path, log_file_path
+                "Started a process (PID: %s) to generate tasks for %s",
+                processor.pid, file_path
             )
-
             self._processors[file_path] = processor
 
-        self.symlink_latest_log_directory()
-
         # Update scheduler heartbeat count.
         self._run_count[self._heart_beat_key] += 1
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/airflow/utils/log/file_processor_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_processor_handler.py 
b/airflow/utils/log/file_processor_handler.py
new file mode 100644
index 0000000..cd03a2a
--- /dev/null
+++ b/airflow/utils/log/file_processor_handler.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+from jinja2 import Template
+
+from airflow import configuration as conf
+from datetime import datetime
+
+
+class FileProcessorHandler(logging.Handler):
+    """
+    FileProcessorHandler is a python log handler that handles
+    dag processor logs. It creates and delegates log handling
+    to `logging.FileHandler` after receiving dag processor context.
+    """
+
+    def __init__(self, base_log_folder, filename_template):
+        """
+        :param base_log_folder: Base log folder to place logs.
+        :param filename_template: template filename string
+        """
+        super(FileProcessorHandler, self).__init__()
+        self.handler = None
+        self.base_log_folder = base_log_folder
+        self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
+        self.filename_template = filename_template
+        self.filename_jinja_template = None
+
+        if "{{" in self.filename_template: #jinja mode
+            self.filename_jinja_template = Template(self.filename_template)
+
+        self._cur_date = datetime.today()
+        if not os.path.exists(self._get_log_directory()):
+            os.makedirs(self._get_log_directory())
+
+        self._symlink_latest_log_directory()
+
+    def set_context(self, filename):
+        """
+        Provide filename context to airflow task handler.
+        :param filename: filename in which the dag is located
+        """
+        local_loc = self._init_file(filename)
+        self.handler = logging.FileHandler(local_loc)
+        self.handler.setFormatter(self.formatter)
+        self.handler.setLevel(self.level)
+
+        if self._cur_date < datetime.today():
+            self._symlink_latest_log_directory()
+            self._cur_date = datetime.today()
+
+    def emit(self, record):
+        if self.handler is not None:
+            self.handler.emit(record)
+
+    def flush(self):
+        if self.handler is not None:
+            self.handler.flush()
+
+    def close(self):
+        if self.handler is not None:
+            self.handler.close()
+
+    def _render_filename(self, filename):
+        filename = os.path.relpath(filename, self.dag_dir)
+        ctx = dict()
+        ctx['filename'] = filename
+
+        if self.filename_jinja_template:
+            return self.filename_jinja_template.render(**ctx)
+
+        return self.filename_template.format(filename=ctx['filename'])
+
+    def _get_log_directory(self):
+        now = datetime.utcnow()
+
+        return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d"))
+
+    def _symlink_latest_log_directory(self):
+        """
+        Create symbolic link to the current day's log directory to
+        allow easy access to the latest scheduler log files.
+
+        :return: None
+        """
+        log_directory = self._get_log_directory()
+        latest_log_directory_path = os.path.join(self.base_log_folder, 
"latest")
+        if os.path.isdir(log_directory):
+            # if symlink exists but is stale, update it
+            if os.path.islink(latest_log_directory_path):
+                if os.readlink(latest_log_directory_path) != log_directory:
+                    os.unlink(latest_log_directory_path)
+                    os.symlink(log_directory, latest_log_directory_path)
+            elif (os.path.isdir(latest_log_directory_path) or
+                      os.path.isfile(latest_log_directory_path)):
+                self.log.warning(
+                    "%s already exists as a dir/file. Skip creating symlink.",
+                    latest_log_directory_path
+                )
+            else:
+                os.symlink(log_directory, latest_log_directory_path)
+
+    def _init_file(self, filename):
+        """
+        Create log file and directory if required.
+        :param filename: task instance object
+        :return relative log path of the given task instance
+        """
+        relative_path = self._render_filename(filename)
+        full_path = os.path.join(self._get_log_directory(), relative_path)
+        directory = os.path.dirname(full_path)
+
+        if not os.path.exists(directory):
+            os.makedirs(directory)
+
+        if not os.path.exists(full_path):
+            open(full_path, "a").close()
+
+        return full_path

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 513ed45..f49dc2a 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -666,22 +666,6 @@ class CoreTest(unittest.TestCase):
         job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
         job.run()
 
-    @mock.patch('airflow.utils.dag_processing.datetime', FakeDatetime)
-    def test_scheduler_job(self):
-        FakeDatetime.utcnow = classmethod(lambda cls: datetime(2016, 1, 1))
-        job = jobs.SchedulerJob(dag_id='example_bash_operator',
-                                **self.default_scheduler_args)
-        job.run()
-        log_base_directory = configuration.conf.get("scheduler",
-            "child_process_log_directory")
-        latest_log_directory_path = os.path.join(log_base_directory, "latest")
-        # verify that the symlink to the latest logs exists
-        self.assertTrue(os.path.islink(latest_log_directory_path))
-
-        # verify that the symlink points to the correct log directory
-        log_directory = os.path.join(log_base_directory, "2016-01-01")
-        self.assertEqual(os.readlink(latest_log_directory_path), log_directory)
-
     def test_raw_job(self):
         TI = models.TaskInstance
         ti = TI(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/test_logging_config.py
----------------------------------------------------------------------
diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py
index 9407221..516f666 100644
--- a/tests/test_logging_config.py
+++ b/tests/test_logging_config.py
@@ -196,7 +196,13 @@ class TestLoggingSettings(unittest.TestCase):
     def test_when_the_config_key_does_not_exists(self):
         from airflow import logging_config
 
-        logging_config.conf.get = 
mock.Mock(side_effect=AirflowConfigException('boom'))
+        def side_effect(*args):
+            if args[1] == 'logging_config_class':
+                raise AirflowConfigException
+            else:
+                return "bla_bla_from_test"
+
+        logging_config.conf.get = mock.Mock(side_effect=side_effect)
 
         with patch.object(logging_config.log, 'debug') as mock_debug:
             logging_config.configure_logging()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/__init__.py b/tests/utils/log/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/utils/log/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_file_processor_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_file_processor_handler.py 
b/tests/utils/log/test_file_processor_handler.py
new file mode 100644
index 0000000..defe623
--- /dev/null
+++ b/tests/utils/log/test_file_processor_handler.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import shutil
+import os
+import unittest
+
+from airflow.utils.log.file_processor_handler import FileProcessorHandler
+from datetime import datetime
+from datetime import timedelta
+from freezegun import freeze_time
+
+
+class TestFileProcessorHandler(unittest.TestCase):
+    def setUp(self):
+        super(TestFileProcessorHandler, self).setUp()
+        self.base_log_folder = "/tmp/log_test"
+        self.filename = "{filename}"
+        self.filename_template = "{{ filename }}.log"
+        self.dag_dir = "/dags"
+
+    def test_non_template(self):
+        date = datetime.utcnow().strftime("%Y-%m-%d")
+        handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
+                                       filename_template=self.filename)
+        handler.dag_dir = self.dag_dir
+
+        path = os.path.join(self.base_log_folder, "latest")
+        self.assertTrue(os.path.islink(path))
+        self.assertEqual(os.path.basename(os.readlink(path)), date)
+
+        handler.set_context(filename=os.path.join(self.dag_dir, "logfile"))
+        self.assertTrue(os.path.exists(os.path.join(path, "logfile")))
+
+    def test_template(self):
+        date = datetime.utcnow().strftime("%Y-%m-%d")
+        handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
+                                       
filename_template=self.filename_template)
+        handler.dag_dir = self.dag_dir
+
+        path = os.path.join(self.base_log_folder, "latest")
+        self.assertTrue(os.path.islink(path))
+        self.assertEqual(os.path.basename(os.readlink(path)), date)
+
+        handler.set_context(filename=os.path.join(self.dag_dir, "logfile"))
+        self.assertTrue(os.path.exists(os.path.join(path, "logfile.log")))
+
+    def test_symlink_latest_log_directory(self):
+        handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
+                                       filename_template=self.filename)
+        handler.dag_dir = self.dag_dir
+
+        date1 = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
+        date2 = (datetime.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d")
+
+        p1 = os.path.join(self.base_log_folder, date1, "log1")
+        p2 = os.path.join(self.base_log_folder, date1, "log2")
+
+        if os.path.exists(p1):
+            os.remove(p1)
+        if os.path.exists(p2):
+            os.remove(p2)
+
+        link = os.path.join(self.base_log_folder, "latest")
+
+        with freeze_time(date1):
+            handler.set_context(filename=os.path.join(self.dag_dir, "log1"))
+            self.assertTrue(os.path.islink(link))
+            self.assertEqual(os.path.basename(os.readlink(link)), date1)
+            self.assertTrue(os.path.exists(os.path.join(link, "log1")))
+
+        with freeze_time(date2):
+            handler.set_context(filename=os.path.join(self.dag_dir, "log2"))
+            self.assertTrue(os.path.islink(link))
+            self.assertEqual(os.path.basename(os.readlink(link)), date2)
+            self.assertTrue(os.path.exists(os.path.join(link, "log2")))
+
+    def tearDown(self):
+        shutil.rmtree(self.base_log_folder, ignore_errors=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_logging.py b/tests/utils/log/test_logging.py
deleted file mode 100644
index 57f869f..0000000
--- a/tests/utils/log/test_logging.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import mock
-import unittest
-
-from airflow.utils.log.s3_task_handler import S3TaskHandler
-
-
-class TestS3TaskHandler(unittest.TestCase):
-
-    def setUp(self):
-        super(S3TaskHandler, self).setUp()
-        self.remote_log_location = 'remote/log/location'
-        self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
-        self.hook_mock = self.hook_patcher.start()
-        self.hook_inst_mock = self.hook_mock.return_value
-        self.hook_key_mock = self.hook_inst_mock.get_key.return_value
-        self.hook_key_mock.get_contents_as_string.return_value.decode.\
-            return_value = 'content'
-
-    def tearDown(self):
-        self.hook_patcher.stop()
-        super(S3TaskHandler, self).tearDown()
-
-    def test_init(self):
-        S3TaskHandler()
-        self.hook_mock.assert_called_once_with('')
-
-    def test_init_raises(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
-        handler = S3TaskHandler()
-        with mock.patch.object(handler.log, 'error') as mock_error:
-            # Initialize the hook
-            handler.hook()
-            mock_error.assert_called_once_with(
-                'Could not create an S3Hook with connection id "". Please make 
'
-                'sure that airflow[s3] is installed and the S3 connection 
exists.'
-            )
-
-    def test_log_exists(self):
-        self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_log_exists_none(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_log_exists_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_log_exists_false(self):
-        self.hook_inst_mock.check_for_key.return_value = False
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_log_exists_no_hook(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_read(self):
-        self.assertEqual(
-            S3TaskHandler().read(self.remote_log_location),
-            'content'
-        )
-
-    def test_read_key_empty(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
-
-    def test_read_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
-
-    def test_read_raises_return_error(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        handler = S3TaskHandler()
-        with mock.patch.object(handler.log, 'error') as mock_error:
-            result = handler.s3_log_read(
-                self.remote_log_location,
-                return_error=True
-            )
-            msg = 'Could not read logs from %s' % self.remote_log_location
-            self.assertEqual(result, msg)
-            mock_error.assert_called_once_with(msg)
-
-    def test_write(self):
-        S3TaskHandler().write('text', self.remote_log_location)
-        self.hook_inst_mock.load_string.assert_called_once_with(
-            'content\ntext',
-            key=self.remote_log_location,
-            replace=True,
-            encrypt=False,
-        )
-
-    def test_write_raises(self):
-        self.hook_inst_mock.read_key.return_value = ''
-        self.hook_inst_mock.load_string.side_effect = Exception('error')
-        handler = S3TaskHandler()
-        with mock.patch.object(handler.log, 'error') as mock_error:
-            handler.write('text', self.remote_log_location)
-            msg = 'Could not write logs to %s' % self.remote_log_location
-            mock_error.assert_called_once_with(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ee4e474/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py 
b/tests/utils/log/test_s3_task_handler.py
new file mode 100644
index 0000000..0438630
--- /dev/null
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import unittest
+
+from airflow.utils.log.s3_task_handler import S3TaskHandler
+
+
[email protected]("Non functional S3 tests")
+class TestS3TaskHandler(unittest.TestCase):
+
+    def setUp(self):
+        super(TestS3TaskHandler, self).setUp()
+        self.remote_log_location = 'remote/log/location'
+        self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
+        self.hook_mock = self.hook_patcher.start()
+        self.hook_inst_mock = self.hook_mock.return_value
+        self.hook_key_mock = self.hook_inst_mock.get_key.return_value
+        self.hook_key_mock.get_contents_as_string.return_value.decode.\
+            return_value = 'content'
+
+    def tearDown(self):
+        self.hook_patcher.stop()
+        super(TestS3TaskHandler, self).tearDown()
+
+    def test_init(self):
+        S3TaskHandler()
+        self.hook_mock.assert_called_once_with('')
+
+    def test_init_raises(self):
+        self.hook_mock.side_effect = Exception('Failed to connect')
+        handler = S3TaskHandler()
+        with mock.patch.object(handler.log, 'error') as mock_error:
+            # Initialize the hook
+            handler.hook()
+            mock_error.assert_called_once_with(
+                'Could not create an S3Hook with connection id "". Please make 
'
+                'sure that airflow[s3] is installed and the S3 connection 
exists.'
+            )
+
+    def test_log_exists(self):
+        self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location))
+
+    def test_log_exists_none(self):
+        self.hook_inst_mock.get_key.return_value = None
+        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+
+    def test_log_exists_raises(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+
+    def test_log_exists_false(self):
+        self.hook_inst_mock.check_for_key.return_value = False
+        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+
+    def test_log_exists_no_hook(self):
+        self.hook_mock.side_effect = Exception('Failed to connect')
+        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+
+    def test_read(self):
+        self.assertEqual(
+            S3TaskHandler().read(self.remote_log_location),
+            'content'
+        )
+
+    def test_read_key_empty(self):
+        self.hook_inst_mock.get_key.return_value = None
+        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
+
+    def test_read_raises(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
+
+    def test_read_raises_return_error(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        handler = S3TaskHandler()
+        with mock.patch.object(handler.log, 'error') as mock_error:
+            result = handler.s3_log_read(
+                self.remote_log_location,
+                return_error=True
+            )
+            msg = 'Could not read logs from %s' % self.remote_log_location
+            self.assertEqual(result, msg)
+            mock_error.assert_called_once_with(msg)
+
+    def test_write(self):
+        S3TaskHandler().write('text', self.remote_log_location)
+        self.hook_inst_mock.load_string.assert_called_once_with(
+            'content\ntext',
+            key=self.remote_log_location,
+            replace=True,
+            encrypt=False,
+        )
+
+    def test_write_raises(self):
+        self.hook_inst_mock.read_key.return_value = ''
+        self.hook_inst_mock.load_string.side_effect = Exception('error')
+        handler = S3TaskHandler()
+        with mock.patch.object(handler.log, 'error') as mock_error:
+            handler.write('text', self.remote_log_location)
+            msg = 'Could not write logs to %s' % self.remote_log_location
+            mock_error.assert_called_once_with(msg)


Reply via email to