Repository: incubator-airflow
Updated Branches:
  refs/heads/master f5c845739 -> d9109d645


[AIRFLOW-1486] Unexpected S3 writing log error

Removed unexpected S3 writing log error and added tests for s3 logging.

Closes #2499 from skudriashev/airflow-1486


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9109d64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9109d64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9109d64

Branch: refs/heads/master
Commit: d9109d6458d136cd2b76ef7180be498ae09b3ea3
Parents: f5c8457
Author: Stanislav Kudriashev <stas.kudrias...@gmail.com>
Authored: Mon Aug 7 15:52:51 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Aug 7 15:52:54 2017 -0700

----------------------------------------------------------------------
 airflow/utils/logging.py    |  40 +++++++--------
 tests/utils/test_logging.py | 107 +++++++++++++++++++++++++++++++++++----
 2 files changed, 117 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index b86d839..6e18e52 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -91,10 +91,13 @@ class S3Log(object):
             except:
                 pass
 
-        # raise/return error if we get here
-        err = 'Could not read logs from {}'.format(remote_log_location)
-        logging.error(err)
-        return err if return_error else ''
+        # return error if needed
+        if return_error:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            logging.error(msg)
+            return msg
+
+        return ''
 
     def write(self, log, remote_log_location, append=True):
         """
@@ -108,25 +111,21 @@ class S3Log(object):
         :param append: if False, any existing log file is overwritten. If True,
             the new log is appended to any existing logs.
         :type append: bool
-
         """
         if self.hook:
-
             if append:
                 old_log = self.read(remote_log_location)
-                log = old_log + '\n' + log
+                log = '\n'.join([old_log, log])
+
             try:
                 self.hook.load_string(
                     log,
                     key=remote_log_location,
                     replace=True,
-                    encrypt=configuration.getboolean('core', 
'ENCRYPT_S3_LOGS'))
-                return
+                    encrypt=configuration.getboolean('core', 
'ENCRYPT_S3_LOGS'),
+                )
             except:
-                pass
-
-        # raise/return error if we get here
-        logging.error('Could not write logs to {}'.format(remote_log_location))
+                logging.error('Could not write logs to 
{}'.format(remote_log_location))
 
 
 class GCSLog(object):
@@ -183,10 +182,13 @@ class GCSLog(object):
             except:
                 pass
 
-        # raise/return error if we get here
-        err = 'Could not read logs from {}'.format(remote_log_location)
-        logging.error(err)
-        return err if return_error else ''
+        # return error if needed
+        if return_error:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            logging.error(msg)
+            return msg
+
+        return ''
 
     def write(self, log, remote_log_location, append=True):
         """
@@ -200,12 +202,11 @@ class GCSLog(object):
         :param append: if False, any existing log file is overwritten. If True,
             the new log is appended to any existing logs.
         :type append: bool
-
         """
         if self.hook:
             if append:
                 old_log = self.read(remote_log_location)
-                log = old_log + '\n' + log
+                log = '\n'.join([old_log, log])
 
             try:
                 bkt, blob = self.parse_gcs_url(remote_log_location)
@@ -218,7 +219,6 @@ class GCSLog(object):
                     tmpfile.flush()
                     self.hook.upload(bkt, blob, tmpfile.name)
             except:
-                # raise/return error if we get here
                 logging.error('Could not write logs to 
{}'.format(remote_log_location))
 
     def parse_gcs_url(self, gsurl):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/tests/utils/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py
index 474430f..3f9f5d6 100644
--- a/tests/utils/test_logging.py
+++ b/tests/utils/test_logging.py
@@ -14,16 +14,103 @@
 
 import unittest
 
-from airflow.exceptions import AirflowException
-from airflow.utils import logging as logging_utils
-from datetime import datetime, timedelta
+import mock
 
-class Logging(unittest.TestCase):
+from airflow.utils import logging
+from datetime import datetime
+
+
+class TestLogging(unittest.TestCase):
 
     def test_get_log_filename(self):
-        dag_id = 'dag_id'
-        task_id = 'task_id'
-        execution_date = datetime(2017, 1, 1, 0, 0, 0)
-        try_number = 0
-        filename = logging_utils.get_log_filename(dag_id, task_id, 
execution_date, try_number)
-        self.assertEqual(filename, 'dag_id/task_id/2017-01-01T00:00:00/1.log')
+        self.assertEqual(
+            logging.get_log_filename(
+                dag_id='dag_id',
+                task_id='task_id',
+                execution_date=datetime(2017, 1, 1, 0, 0, 0),
+                try_number=0,
+            ),
+            'dag_id/task_id/2017-01-01T00:00:00/1.log',
+        )
+
+
+class TestS3Log(unittest.TestCase):
+
+    def setUp(self):
+        super(TestS3Log, self).setUp()
+        self.remote_log_location = 'remote/log/location'
+        self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
+        self.hook_mock = self.hook_patcher.start()
+        self.hook_inst_mock = self.hook_mock.return_value
+        self.hook_key_mock = self.hook_inst_mock.get_key.return_value
+        self.hook_key_mock.get_contents_as_string.return_value.decode.\
+            return_value = 'content'
+        self.logging_patcher = mock.patch("airflow.utils.logging.logging")
+        self.logging_mock = self.logging_patcher.start()
+
+    def tearDown(self):
+        self.logging_patcher.stop()
+        self.hook_patcher.stop()
+        super(TestS3Log, self).tearDown()
+
+    def test_init(self):
+        logging.S3Log()
+        self.hook_mock.assert_called_once_with('')
+
+    def test_init_raises(self):
+        self.hook_mock.side_effect = Exception('Failed to connect')
+        logging.S3Log()
+        self.logging_mock.error.assert_called_once_with(
+            'Could not create an S3Hook with connection id "". Please make '
+            'sure that airflow[s3] is installed and the S3 connection exists.'
+        )
+
+    def test_log_exists(self):
+        self.assertTrue(logging.S3Log().log_exists(self.remote_log_location))
+
+    def test_log_exists_none(self):
+        self.hook_inst_mock.get_key.return_value = None
+        self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))
+
+    def test_log_exists_raises(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))
+
+    def test_log_exists_no_hook(self):
+        self.hook_mock.side_effect = Exception('Failed to connect')
+        self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))
+
+    def test_read(self):
+        self.assertEqual(logging.S3Log().read(self.remote_log_location),
+                         'content')
+
+    def test_read_key_empty(self):
+        self.hook_inst_mock.get_key.return_value = None
+        self.assertEqual(logging.S3Log().read(self.remote_log_location), '')
+
+    def test_read_raises(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        self.assertEqual(logging.S3Log().read(self.remote_log_location), '')
+
+    def test_read_raises_return_error(self):
+        self.hook_inst_mock.get_key.side_effect = Exception('error')
+        result = logging.S3Log().read(self.remote_log_location,
+                                      return_error=True)
+        msg = 'Could not read logs from %s' % self.remote_log_location
+        self.assertEqual(result, msg)
+        self.logging_mock.error.assert_called_once_with(msg)
+
+    def test_write(self):
+        logging.S3Log().write('text', self.remote_log_location)
+        self.hook_inst_mock.load_string.assert_called_once_with(
+            'content\ntext',
+            key=self.remote_log_location,
+            replace=True,
+            encrypt=False,
+        )
+
+    def test_write_raises(self):
+        self.hook_inst_mock.load_string.side_effect = Exception('error')
+        logging.S3Log().write('text', self.remote_log_location)
+        msg = 'Could not write logs to %s' % self.remote_log_location
+        self.logging_mock.error.assert_called_once_with(msg)

Reply via email to