Repository: incubator-airflow Updated Branches: refs/heads/master 28411b1e7 -> 715602ce6
[AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook The change from boto2 to boto3 in S3Hook caused this to break (the return type of `hook.get_key()` changed. There's a better method designed for that we should use anyway. This wasn't caught by the tests as the mocks weren't updated. Rather than mocking the return of the hook I have changed it to use "moto" (already in use elsewhere in the tests) to mock at the S3 layer, not our hook. Closes #2773 from ashb/AIRFLOW-1756-s3-logging- boto3-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/715602ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/715602ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/715602ce Branch: refs/heads/master Commit: 715602ce6a78d773ca85397cf8a0fa85afe42b74 Parents: 28411b1 Author: Ash Berlin-Taylor <ash_git...@firemirror.com> Authored: Thu Nov 9 21:57:04 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Thu Nov 9 21:57:04 2017 +0100 ---------------------------------------------------------------------- airflow/utils/log/s3_task_handler.py | 12 ++- tests/utils/log/test_s3_task_handler.py | 128 +++++++++++++++------------ 2 files changed, 74 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 1e56655..cfa966a 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -127,14 +127,12 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): :type return_error: bool """ try: - s3_key = self.hook.get_key(remote_log_location) - if s3_key: - return s3_key.get_contents_as_string().decode() + return self.hook.read_key(remote_log_location) except: + msg = 'Could not read logs from {}'.format(remote_log_location) + self.log.exception(msg) # return error if needed if return_error: - msg = 'Could not read logs from {}'.format(remote_log_location) - self.log.error(msg) return msg def s3_write(self, log, remote_log_location, append=True): @@ -149,7 +147,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): the new log is appended to any existing logs. :type append: bool """ - if append: + if append and self.s3_log_exists(remote_log_location): old_log = self.s3_read(remote_log_location) log = '\n'.join([old_log, log]) if old_log else log @@ -161,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'), ) except: - self.log.error('Could not write logs to %s', remote_log_location) + self.log.exception('Could not write logs to %s', remote_log_location) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index da879b6..b1354cd 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -12,46 +12,66 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import mock import unittest +from airflow import configuration from airflow.utils.log.s3_task_handler import S3TaskHandler +from airflow.hooks.S3_hook import S3Hook +from airflow.models import TaskInstance, DAG +from airflow.operators.dummy_operator import DummyOperator +try: + import boto3 + import moto + from moto import mock_s3 +except ImportError: + mock_s3 = None + +@unittest.skipIf(mock_s3 is None, + "Skipping test because moto.mock_s3 is not available") +@mock_s3 class TestS3TaskHandler(unittest.TestCase): def setUp(self): super(TestS3TaskHandler, self).setUp() - self.remote_log_location = 'remote/log/location' + self.remote_log_location = 's3://bucket/remote/log/location' + self.remote_log_key = 'remote/log/location' self.local_log_location = 'local/log/location' - self.s3_log_location = 's3/log/location' - self.filename_template = '' - self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook") - self.hook_mock = self.hook_patcher.start() - self.hook_inst_mock = self.hook_mock.return_value - self.hook_key_mock = self.hook_inst_mock.get_key.return_value - self.hook_key_mock.get_contents_as_string.return_value.decode.\ - return_value = 'content' + self.filename_template = '{try_number}.log' self.s3_task_handler = S3TaskHandler( self.local_log_location, - self.s3_log_location, + self.remote_log_location, self.filename_template ) - def tearDown(self): - self.hook_patcher.stop() - super(TestS3TaskHandler, self).tearDown() + configuration.load_test_config() + date = datetime(2016, 1, 1) + self.dag = DAG('dag_for_testing_file_task_handler', start_date=date) + task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag) + self.ti = TaskInstance(task=task, execution_date=date) + self.ti.try_number = 1 + self.addCleanup(self.dag.clear) + + self.conn = boto3.client('s3') + # We need to create the bucket since this is all in Moto's 'virtual' + # AWS account + moto.core.moto_api_backend.reset() + self.conn.create_bucket(Bucket="bucket") - def test_init(self): - self.s3_task_handler.hook() - self.hook_mock.assert_called_once_with('') + def test_hook(self): + self.assertIsInstance(self.s3_task_handler.hook, S3Hook) - def test_init_raises(self): - self.hook_mock.side_effect = Exception('Failed to connect') + def test_hook_raises(self): handler = self.s3_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - # Initialize the hook - handler.hook + with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + mock_hook.side_effect = Exception('Failed to connect') + # Initialize the hook + handler.hook + mock_error.assert_called_once_with( 'Could not create an S3Hook with connection id "%s". Please make ' 'sure that airflow[s3] is installed and the S3 connection exists.', @@ -59,66 +79,56 @@ class TestS3TaskHandler(unittest.TestCase): ) def test_log_exists(self): + self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'') self.assertTrue(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_log_exists_none(self): - self.hook_inst_mock.get_key.return_value = None self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_log_exists_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) - - def test_log_exists_false(self): - self.hook_inst_mock.get_key.return_value = None - self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) + self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo')) def test_log_exists_no_hook(self): - self.hook_mock.side_effect = Exception('Failed to connect') - self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) + with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + mock_hook.side_effect = Exception('Failed to connect') + self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_read(self): + self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n') self.assertEqual( - self.s3_task_handler.s3_read(self.remote_log_location), - 'content' + self.s3_task_handler.read(self.ti), + ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n'] ) - def test_read_key_empty(self): - self.hook_inst_mock.get_key.return_value = None - self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None) - - def test_read_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None) - def test_read_raises_return_error(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') handler = self.s3_task_handler + url = 's3://nonexistentbucket/foo' with mock.patch.object(handler.log, 'error') as mock_error: - result = handler.s3_read( - self.remote_log_location, - return_error=True - ) - msg = 'Could not read logs from %s' % self.remote_log_location + result = handler.s3_read(url, return_error=True) + msg = 'Could not read logs from %s' % url self.assertEqual(result, msg) - mock_error.assert_called_once_with(msg) + mock_error.assert_called_once_with(msg, exc_info=True) def test_write(self): + with mock.patch.object(self.s3_task_handler.log, 'error') as mock_error: + self.s3_task_handler.s3_write('text', self.remote_log_location) + # We shouldn't expect any error logs in the default working case. + mock_error.assert_not_called() + body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read() + + self.assertEqual(body, b'text') + + def test_write_existing(self): + self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'previous ') self.s3_task_handler.s3_write('text', self.remote_log_location) - self.hook_inst_mock.load_string.assert_called_once_with( - 'content\ntext', - key=self.remote_log_location, - replace=True, - encrypt=False, - ) + body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read() + + self.assertEqual(body, b'previous \ntext') def test_write_raises(self): - self.hook_inst_mock.read_key.return_value = '' - self.hook_inst_mock.load_string.side_effect = Exception('error') handler = self.s3_task_handler + url = 's3://nonexistentbucket/foo' with mock.patch.object(handler.log, 'error') as mock_error: - handler.s3_write('text', self.remote_log_location) - mock_error.assert_called_once_with( - 'Could not write logs to %s', - 'remote/log/location' - ) + handler.s3_write('text', url) + self.assertEqual + mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True)