Repository: incubator-airflow Updated Branches: refs/heads/v1-9-stable 6b7c17d17 -> f7afd5a98
[AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook The change from boto2 to boto3 in S3Hook caused this to break (the return type of `hook.get_key()` changed. There's a better method designed for that we should use anyway. This wasn't caught by the tests as the mocks weren't updated. Rather than mocking the return of the hook I have changed it to use "moto" (already in use elsewhere in the tests) to mock at the S3 layer, not our hook. Closes #2773 from ashb/AIRFLOW-1756-s3-logging- boto3-fix (cherry picked from commit 715602ce6a78d773ca85397cf8a0fa85afe42b74) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f7afd5a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f7afd5a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f7afd5a9 Branch: refs/heads/v1-9-stable Commit: f7afd5a9858218b839fec10774dfadfd1a478969 Parents: 6b7c17d Author: Ash Berlin-Taylor <ash_git...@firemirror.com> Authored: Thu Nov 9 21:57:04 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Thu Nov 9 22:00:51 2017 +0100 ---------------------------------------------------------------------- airflow/utils/log/s3_task_handler.py | 12 +-- tests/utils/log/test_s3_task_handler.py | 152 +++++++++++++++------------ 2 files changed, 91 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f7afd5a9/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 1e56655..cfa966a 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -127,14 +127,12 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): :type return_error: bool """ try: - s3_key = self.hook.get_key(remote_log_location) - if s3_key: - return s3_key.get_contents_as_string().decode() + return self.hook.read_key(remote_log_location) except: + msg = 'Could not read logs from {}'.format(remote_log_location) + self.log.exception(msg) # return error if needed if return_error: - msg = 'Could not read logs from {}'.format(remote_log_location) - self.log.error(msg) return msg def s3_write(self, log, remote_log_location, append=True): @@ -149,7 +147,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): the new log is appended to any existing logs. :type append: bool """ - if append: + if append and self.s3_log_exists(remote_log_location): old_log = self.s3_read(remote_log_location) log = '\n'.join([old_log, log]) if old_log else log @@ -161,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'), ) except: - self.log.error('Could not write logs to %s', remote_log_location) + self.log.exception('Could not write logs to %s', remote_log_location) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f7afd5a9/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index 0438630..b1354cd 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -12,103 +12,123 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import mock import unittest +from airflow import configuration from airflow.utils.log.s3_task_handler import S3TaskHandler +from airflow.hooks.S3_hook import S3Hook +from airflow.models import TaskInstance, DAG +from airflow.operators.dummy_operator import DummyOperator +try: + import boto3 + import moto + from moto import mock_s3 +except ImportError: + mock_s3 = None -@unittest.skip("Non functional S3 tests") + +@unittest.skipIf(mock_s3 is None, + "Skipping test because moto.mock_s3 is not available") +@mock_s3 class TestS3TaskHandler(unittest.TestCase): def setUp(self): super(TestS3TaskHandler, self).setUp() - self.remote_log_location = 'remote/log/location' - self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook") - self.hook_mock = self.hook_patcher.start() - self.hook_inst_mock = self.hook_mock.return_value - self.hook_key_mock = self.hook_inst_mock.get_key.return_value - self.hook_key_mock.get_contents_as_string.return_value.decode.\ - return_value = 'content' - - def tearDown(self): - self.hook_patcher.stop() - super(TestS3TaskHandler, self).tearDown() - - def test_init(self): - S3TaskHandler() - self.hook_mock.assert_called_once_with('') - - def test_init_raises(self): - self.hook_mock.side_effect = Exception('Failed to connect') - handler = S3TaskHandler() + self.remote_log_location = 's3://bucket/remote/log/location' + self.remote_log_key = 'remote/log/location' + self.local_log_location = 'local/log/location' + self.filename_template = '{try_number}.log' + self.s3_task_handler = S3TaskHandler( + self.local_log_location, + self.remote_log_location, + self.filename_template + ) + + configuration.load_test_config() + date = datetime(2016, 1, 1) + self.dag = DAG('dag_for_testing_file_task_handler', start_date=date) + task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag) + self.ti = TaskInstance(task=task, execution_date=date) + self.ti.try_number = 1 + self.addCleanup(self.dag.clear) + + self.conn = boto3.client('s3') + # We need to create the bucket since this is all in Moto's 'virtual' + # AWS account + moto.core.moto_api_backend.reset() + self.conn.create_bucket(Bucket="bucket") + + def test_hook(self): + self.assertIsInstance(self.s3_task_handler.hook, S3Hook) + + def test_hook_raises(self): + handler = self.s3_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - # Initialize the hook - handler.hook() + with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + mock_hook.side_effect = Exception('Failed to connect') + # Initialize the hook + handler.hook + mock_error.assert_called_once_with( - 'Could not create an S3Hook with connection id "". Please make ' - 'sure that airflow[s3] is installed and the S3 connection exists.' + 'Could not create an S3Hook with connection id "%s". Please make ' + 'sure that airflow[s3] is installed and the S3 connection exists.', + '' ) def test_log_exists(self): - self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location)) + self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'') + self.assertTrue(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_log_exists_none(self): - self.hook_inst_mock.get_key.return_value = None - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_log_exists_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) - - def test_log_exists_false(self): - self.hook_inst_mock.check_for_key.return_value = False - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo')) def test_log_exists_no_hook(self): - self.hook_mock.side_effect = Exception('Failed to connect') - self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location)) + with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + mock_hook.side_effect = Exception('Failed to connect') + self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_read(self): + self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n') self.assertEqual( - S3TaskHandler().read(self.remote_log_location), - 'content' + self.s3_task_handler.read(self.ti), + ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n'] ) - def test_read_key_empty(self): - self.hook_inst_mock.get_key.return_value = None - self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') - - def test_read_raises(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - self.assertEqual(S3TaskHandler().read(self.remote_log_location), '') - def test_read_raises_return_error(self): - self.hook_inst_mock.get_key.side_effect = Exception('error') - handler = S3TaskHandler() + handler = self.s3_task_handler + url = 's3://nonexistentbucket/foo' with mock.patch.object(handler.log, 'error') as mock_error: - result = handler.s3_log_read( - self.remote_log_location, - return_error=True - ) - msg = 'Could not read logs from %s' % self.remote_log_location + result = handler.s3_read(url, return_error=True) + msg = 'Could not read logs from %s' % url self.assertEqual(result, msg) - mock_error.assert_called_once_with(msg) + mock_error.assert_called_once_with(msg, exc_info=True) def test_write(self): - S3TaskHandler().write('text', self.remote_log_location) - self.hook_inst_mock.load_string.assert_called_once_with( - 'content\ntext', - key=self.remote_log_location, - replace=True, - encrypt=False, - ) + with mock.patch.object(self.s3_task_handler.log, 'error') as mock_error: + self.s3_task_handler.s3_write('text', self.remote_log_location) + # We shouldn't expect any error logs in the default working case. + mock_error.assert_not_called() + body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read() + + self.assertEqual(body, b'text') + + def test_write_existing(self): + self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'previous ') + self.s3_task_handler.s3_write('text', self.remote_log_location) + body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read() + + self.assertEqual(body, b'previous \ntext') def test_write_raises(self): - self.hook_inst_mock.read_key.return_value = '' - self.hook_inst_mock.load_string.side_effect = Exception('error') - handler = S3TaskHandler() + handler = self.s3_task_handler + url = 's3://nonexistentbucket/foo' with mock.patch.object(handler.log, 'error') as mock_error: - handler.write('text', self.remote_log_location) - msg = 'Could not write logs to %s' % self.remote_log_location - mock_error.assert_called_once_with(msg) + handler.s3_write('text', url) + self.assertEqual + mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True)