Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-stable 6b7c17d17 -> f7afd5a98


[AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook

The change from boto2 to boto3 in S3Hook caused
this to break (the
return type of `hook.get_key()` changed. There's a
better method
designed for that we should use anyway.

This wasn't caught by the tests as the mocks
weren't updated. Rather
than mocking the return of the hook I have changed
it to use "moto"
(already in use elsewhere in the tests) to mock at
the S3 layer, not
our hook.

Closes #2773 from ashb/AIRFLOW-1756-s3-logging-
boto3-fix

(cherry picked from commit 715602ce6a78d773ca85397cf8a0fa85afe42b74)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f7afd5a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f7afd5a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f7afd5a9

Branch: refs/heads/v1-9-stable
Commit: f7afd5a9858218b839fec10774dfadfd1a478969
Parents: 6b7c17d
Author: Ash Berlin-Taylor <ash_git...@firemirror.com>
Authored: Thu Nov 9 21:57:04 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Nov 9 22:00:51 2017 +0100

----------------------------------------------------------------------
 airflow/utils/log/s3_task_handler.py    |  12 +--
 tests/utils/log/test_s3_task_handler.py | 152 +++++++++++++++------------
 2 files changed, 91 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f7afd5a9/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
index 1e56655..cfa966a 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -127,14 +127,12 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         :type return_error: bool
         """
         try:
-            s3_key = self.hook.get_key(remote_log_location)
-            if s3_key:
-                return s3_key.get_contents_as_string().decode()
+            return self.hook.read_key(remote_log_location)
         except:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            self.log.exception(msg)
             # return error if needed
             if return_error:
-                msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.log.error(msg)
                 return msg
 
     def s3_write(self, log, remote_log_location, append=True):
@@ -149,7 +147,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             the new log is appended to any existing logs.
         :type append: bool
         """
-        if append:
+        if append and self.s3_log_exists(remote_log_location):
             old_log = self.s3_read(remote_log_location)
             log = '\n'.join([old_log, log]) if old_log else log
 
@@ -161,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
                 encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
             )
         except:
-            self.log.error('Could not write logs to %s', remote_log_location)
+            self.log.exception('Could not write logs to %s', 
remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f7afd5a9/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py 
b/tests/utils/log/test_s3_task_handler.py
index 0438630..b1354cd 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -12,103 +12,123 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from datetime import datetime
 import mock
 import unittest
 
+from airflow import configuration
 from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.hooks.S3_hook import S3Hook
+from airflow.models import TaskInstance, DAG
+from airflow.operators.dummy_operator import DummyOperator
 
+try:
+    import boto3
+    import moto
+    from moto import mock_s3
+except ImportError:
+    mock_s3 = None
 
-@unittest.skip("Non functional S3 tests")
+
+@unittest.skipIf(mock_s3 is None,
+                 "Skipping test because moto.mock_s3 is not available")
+@mock_s3
 class TestS3TaskHandler(unittest.TestCase):
 
     def setUp(self):
         super(TestS3TaskHandler, self).setUp()
-        self.remote_log_location = 'remote/log/location'
-        self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
-        self.hook_mock = self.hook_patcher.start()
-        self.hook_inst_mock = self.hook_mock.return_value
-        self.hook_key_mock = self.hook_inst_mock.get_key.return_value
-        self.hook_key_mock.get_contents_as_string.return_value.decode.\
-            return_value = 'content'
-
-    def tearDown(self):
-        self.hook_patcher.stop()
-        super(TestS3TaskHandler, self).tearDown()
-
-    def test_init(self):
-        S3TaskHandler()
-        self.hook_mock.assert_called_once_with('')
-
-    def test_init_raises(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
-        handler = S3TaskHandler()
+        self.remote_log_location = 's3://bucket/remote/log/location'
+        self.remote_log_key = 'remote/log/location'
+        self.local_log_location = 'local/log/location'
+        self.filename_template = '{try_number}.log'
+        self.s3_task_handler = S3TaskHandler(
+            self.local_log_location,
+            self.remote_log_location,
+            self.filename_template
+        )
+
+        configuration.load_test_config()
+        date = datetime(2016, 1, 1)
+        self.dag = DAG('dag_for_testing_file_task_handler', start_date=date)
+        task = DummyOperator(task_id='task_for_testing_file_log_handler', 
dag=self.dag)
+        self.ti = TaskInstance(task=task, execution_date=date)
+        self.ti.try_number = 1
+        self.addCleanup(self.dag.clear)
+
+        self.conn = boto3.client('s3')
+        # We need to create the bucket since this is all in Moto's 'virtual'
+        # AWS account
+        moto.core.moto_api_backend.reset()
+        self.conn.create_bucket(Bucket="bucket")
+
+    def test_hook(self):
+        self.assertIsInstance(self.s3_task_handler.hook, S3Hook)
+
+    def test_hook_raises(self):
+        handler = self.s3_task_handler
         with mock.patch.object(handler.log, 'error') as mock_error:
-            # Initialize the hook
-            handler.hook()
+            with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+                mock_hook.side_effect = Exception('Failed to connect')
+                # Initialize the hook
+                handler.hook
+
             mock_error.assert_called_once_with(
-                'Could not create an S3Hook with connection id "". Please make 
'
-                'sure that airflow[s3] is installed and the S3 connection 
exists.'
+                'Could not create an S3Hook with connection id "%s". Please 
make '
+                'sure that airflow[s3] is installed and the S3 connection 
exists.',
+                ''
             )
 
     def test_log_exists(self):
-        self.assertTrue(S3TaskHandler().log_exists(self.remote_log_location))
+        self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, 
Body=b'')
+        
self.assertTrue(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_log_exists_none(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+        
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_log_exists_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
-
-    def test_log_exists_false(self):
-        self.hook_inst_mock.check_for_key.return_value = False
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+        
self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo'))
 
     def test_log_exists_no_hook(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
-        self.assertFalse(S3TaskHandler().log_exists(self.remote_log_location))
+        with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+            mock_hook.side_effect = Exception('Failed to connect')
+            
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_read(self):
+        self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', 
Body=b'Log line\n')
         self.assertEqual(
-            S3TaskHandler().read(self.remote_log_location),
-            'content'
+            self.s3_task_handler.read(self.ti),
+            ['*** Reading remote log from 
s3://bucket/remote/log/location/1.log.\nLog line\n\n']
         )
 
-    def test_read_key_empty(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
-
-    def test_read_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertEqual(S3TaskHandler().read(self.remote_log_location), '')
-
     def test_read_raises_return_error(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        handler = S3TaskHandler()
+        handler = self.s3_task_handler
+        url = 's3://nonexistentbucket/foo'
         with mock.patch.object(handler.log, 'error') as mock_error:
-            result = handler.s3_log_read(
-                self.remote_log_location,
-                return_error=True
-            )
-            msg = 'Could not read logs from %s' % self.remote_log_location
+            result = handler.s3_read(url, return_error=True)
+            msg = 'Could not read logs from %s' % url
             self.assertEqual(result, msg)
-            mock_error.assert_called_once_with(msg)
+            mock_error.assert_called_once_with(msg, exc_info=True)
 
     def test_write(self):
-        S3TaskHandler().write('text', self.remote_log_location)
-        self.hook_inst_mock.load_string.assert_called_once_with(
-            'content\ntext',
-            key=self.remote_log_location,
-            replace=True,
-            encrypt=False,
-        )
+        with mock.patch.object(self.s3_task_handler.log, 'error') as 
mock_error:
+            self.s3_task_handler.s3_write('text', self.remote_log_location)
+            # We shouldn't expect any error logs in the default working case.
+            mock_error.assert_not_called()
+        body = boto3.resource('s3').Object('bucket', 
self.remote_log_key).get()['Body'].read()
+
+        self.assertEqual(body, b'text')
+
+    def test_write_existing(self):
+        self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, 
Body=b'previous ')
+        self.s3_task_handler.s3_write('text', self.remote_log_location)
+        body = boto3.resource('s3').Object('bucket', 
self.remote_log_key).get()['Body'].read()
+
+        self.assertEqual(body, b'previous \ntext')
 
     def test_write_raises(self):
-        self.hook_inst_mock.read_key.return_value = ''
-        self.hook_inst_mock.load_string.side_effect = Exception('error')
-        handler = S3TaskHandler()
+        handler = self.s3_task_handler
+        url = 's3://nonexistentbucket/foo'
         with mock.patch.object(handler.log, 'error') as mock_error:
-            handler.write('text', self.remote_log_location)
-            msg = 'Could not write logs to %s' % self.remote_log_location
-            mock_error.assert_called_once_with(msg)
+            handler.s3_write('text', url)
+            self.assertEqual
+            mock_error.assert_called_once_with('Could not write logs to %s', 
url, exc_info=True)

Reply via email to