Repository: incubator-airflow Updated Branches: refs/heads/master f5c845739 -> d9109d645
[AIRFLOW-1486] Unexpected S3 writing log error Removed unexpected S3 writing log error and added tests for s3 logging. Closes #2499 from skudriashev/airflow-1486 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d9109d64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d9109d64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d9109d64 Branch: refs/heads/master Commit: d9109d6458d136cd2b76ef7180be498ae09b3ea3 Parents: f5c8457 Author: Stanislav Kudriashev <stas.kudrias...@gmail.com> Authored: Mon Aug 7 15:52:51 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Aug 7 15:52:54 2017 -0700 ---------------------------------------------------------------------- airflow/utils/logging.py | 40 +++++++-------- tests/utils/test_logging.py | 107 +++++++++++++++++++++++++++++++++++---- 2 files changed, 117 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/airflow/utils/logging.py ---------------------------------------------------------------------- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index b86d839..6e18e52 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -91,10 +91,13 @@ class S3Log(object): except: pass - # raise/return error if we get here - err = 'Could not read logs from {}'.format(remote_log_location) - logging.error(err) - return err if return_error else '' + # return error if needed + if return_error: + msg = 'Could not read logs from {}'.format(remote_log_location) + logging.error(msg) + return msg + + return '' def write(self, log, remote_log_location, append=True): """ @@ -108,25 +111,21 @@ class S3Log(object): :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. :type append: bool - """ if self.hook: - if append: old_log = self.read(remote_log_location) - log = old_log + '\n' + log + log = '\n'.join([old_log, log]) + try: self.hook.load_string( log, key=remote_log_location, replace=True, - encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS')) - return + encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'), + ) except: - pass - - # raise/return error if we get here - logging.error('Could not write logs to {}'.format(remote_log_location)) + logging.error('Could not write logs to {}'.format(remote_log_location)) class GCSLog(object): @@ -183,10 +182,13 @@ class GCSLog(object): except: pass - # raise/return error if we get here - err = 'Could not read logs from {}'.format(remote_log_location) - logging.error(err) - return err if return_error else '' + # return error if needed + if return_error: + msg = 'Could not read logs from {}'.format(remote_log_location) + logging.error(msg) + return msg + + return '' def write(self, log, remote_log_location, append=True): """ @@ -200,12 +202,11 @@ class GCSLog(object): :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. :type append: bool - """ if self.hook: if append: old_log = self.read(remote_log_location) - log = old_log + '\n' + log + log = '\n'.join([old_log, log]) try: bkt, blob = self.parse_gcs_url(remote_log_location) @@ -218,7 +219,6 @@ class GCSLog(object): tmpfile.flush() self.hook.upload(bkt, blob, tmpfile.name) except: - # raise/return error if we get here logging.error('Could not write logs to {}'.format(remote_log_location)) def parse_gcs_url(self, gsurl): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d9109d64/tests/utils/test_logging.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 474430f..3f9f5d6 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -14,16 +14,103 @@ import unittest -from airflow.exceptions import AirflowException -from airflow.utils import logging as logging_utils -from datetime import datetime, timedelta +import mock -class Logging(unittest.TestCase): +from airflow.utils import logging +from datetime import datetime + + +class TestLogging(unittest.TestCase): def test_get_log_filename(self): - dag_id = 'dag_id' - task_id = 'task_id' - execution_date = datetime(2017, 1, 1, 0, 0, 0) - try_number = 0 - filename = logging_utils.get_log_filename(dag_id, task_id, execution_date, try_number) - self.assertEqual(filename, 'dag_id/task_id/2017-01-01T00:00:00/1.log') + self.assertEqual( + logging.get_log_filename( + dag_id='dag_id', + task_id='task_id', + execution_date=datetime(2017, 1, 1, 0, 0, 0), + try_number=0, + ), + 'dag_id/task_id/2017-01-01T00:00:00/1.log', + ) + + +class TestS3Log(unittest.TestCase): + + def setUp(self): + super(TestS3Log, self).setUp() + self.remote_log_location = 'remote/log/location' + self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook") + self.hook_mock = self.hook_patcher.start() + self.hook_inst_mock = self.hook_mock.return_value + self.hook_key_mock = self.hook_inst_mock.get_key.return_value + self.hook_key_mock.get_contents_as_string.return_value.decode.\ + return_value = 'content' + self.logging_patcher = mock.patch("airflow.utils.logging.logging") + self.logging_mock = self.logging_patcher.start() + + def tearDown(self): + self.logging_patcher.stop() + self.hook_patcher.stop() + super(TestS3Log, self).tearDown() + + def test_init(self): + logging.S3Log() + self.hook_mock.assert_called_once_with('') + + def test_init_raises(self): + self.hook_mock.side_effect = Exception('Failed to connect') + logging.S3Log() + self.logging_mock.error.assert_called_once_with( + 'Could not create an S3Hook with connection id "". Please make ' + 'sure that airflow[s3] is installed and the S3 connection exists.' + ) + + def test_log_exists(self): + self.assertTrue(logging.S3Log().log_exists(self.remote_log_location)) + + def test_log_exists_none(self): + self.hook_inst_mock.get_key.return_value = None + self.assertFalse(logging.S3Log().log_exists(self.remote_log_location)) + + def test_log_exists_raises(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + self.assertFalse(logging.S3Log().log_exists(self.remote_log_location)) + + def test_log_exists_no_hook(self): + self.hook_mock.side_effect = Exception('Failed to connect') + self.assertFalse(logging.S3Log().log_exists(self.remote_log_location)) + + def test_read(self): + self.assertEqual(logging.S3Log().read(self.remote_log_location), + 'content') + + def test_read_key_empty(self): + self.hook_inst_mock.get_key.return_value = None + self.assertEqual(logging.S3Log().read(self.remote_log_location), '') + + def test_read_raises(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + self.assertEqual(logging.S3Log().read(self.remote_log_location), '') + + def test_read_raises_return_error(self): + self.hook_inst_mock.get_key.side_effect = Exception('error') + result = logging.S3Log().read(self.remote_log_location, + return_error=True) + msg = 'Could not read logs from %s' % self.remote_log_location + self.assertEqual(result, msg) + self.logging_mock.error.assert_called_once_with(msg) + + def test_write(self): + logging.S3Log().write('text', self.remote_log_location) + self.hook_inst_mock.load_string.assert_called_once_with( + 'content\ntext', + key=self.remote_log_location, + replace=True, + encrypt=False, + ) + + def test_write_raises(self): + self.hook_inst_mock.load_string.side_effect = Exception('error') + logging.S3Log().write('text', self.remote_log_location) + msg = 'Could not write logs to %s' % self.remote_log_location + self.logging_mock.error.assert_called_once_with(msg)