Repository: incubator-airflow Updated Branches: refs/heads/master 81558f3d0 -> 4ce4faaea
[AIRFLOW-1916] Don't upload logs to remote from `run --raw` In a previous change we removed the airflow.task.raw handler (which printed to stdout directly) and replaced it with one that wrote to the log file itself. The problem comes that python automatically calls `logging.shutdown()` itself on process clean exit. This ended up uploading the log file twice: once from the end of `airflow run --raw`, and then again from the explicit shutdown() call at the end of cli's `run()` Since logging is automatically shutdown this change adds and explicit flag to control if the GC and S3 handlers should upload the file or not, and we tell them not to when running with `--raw` Closes #2880 from ashb/AIRFLOW-1916-dont-upload- logs-twice Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4ce4faae Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4ce4faae Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4ce4faae Branch: refs/heads/master Commit: 4ce4faaeae7a76d97defcf9a9d3304ac9d78b9bd Parents: 81558f3 Author: Ash Berlin-Taylor <[email protected]> Authored: Tue Dec 19 21:04:18 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Dec 19 21:04:18 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 7 ++--- airflow/models.py | 6 +++- airflow/utils/log/gcs_task_handler.py | 5 ++++ airflow/utils/log/s3_task_handler.py | 5 ++++ tests/utils/log/test_s3_task_handler.py | 42 ++++++++++++++++++++++++---- 5 files changed, 53 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 3e954dc..e98838d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -362,7 +362,8 @@ def run(args, dag=None): task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - ti.init_run_context() + + ti.init_run_context(raw=args.raw) hostname = socket.getfqdn() log.info("Running %s on host %s", ti, hostname) @@ -419,10 +420,6 @@ def run(args, dag=None): executor.heartbeat() executor.end() - # Child processes should not flush or upload to remote - if args.raw: - return - logging.shutdown() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d69bc57..b2f3bac 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -807,6 +807,9 @@ class TaskInstance(Base, LoggingMixin): self.hostname = '' self.init_on_load() self._log = logging.getLogger("airflow.task") + # Is this TaskInstance being currently running within `airflow run --raw`. + # Not persisted to the database so only valid for the current process + self.is_raw = False @reconstructor def init_on_load(self): @@ -1879,11 +1882,12 @@ class TaskInstance(Base, LoggingMixin): TI.state == State.RUNNING ).count() - def init_run_context(self): + def init_run_context(self, raw=False): """ Sets the log context. """ self._set_context(self) + self.raw = raw class TaskFail(Base): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index a87d1d4..f68165f 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -32,6 +32,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): self.log_relative_path = '' self._hook = None self.closed = False + self.upload_on_close = True def _build_hook(self): remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') @@ -59,6 +60,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): # log path to upload log files into GCS and read from the # remote location. self.log_relative_path = self._render_filename(ti, ti.try_number) + self.upload_on_close = not ti.is_raw def close(self): """ @@ -73,6 +75,9 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): super(GCSTaskHandler, self).close() + if not self.upload_on_close: + return + local_loc = os.path.join(self.local_base, self.log_relative_path) remote_loc = os.path.join(self.remote_base, self.log_relative_path) if os.path.exists(local_loc): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 5ff90c6..b3acf3a 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -30,6 +30,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): self.log_relative_path = '' self._hook = None self.closed = False + self.upload_on_close = True def _build_hook(self): remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') @@ -54,6 +55,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): # Local location and remote location is needed to open and # upload local log file to S3 remote storage. self.log_relative_path = self._render_filename(ti, ti.try_number) + self.upload_on_close = not ti.is_raw def close(self): """ @@ -68,6 +70,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): super(S3TaskHandler, self).close() + if not self.upload_on_close: + return + local_loc = os.path.join(self.local_base, self.log_relative_path) remote_loc = os.path.join(self.remote_base, self.log_relative_path) if os.path.exists(local_loc): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index dc32b5a..53c1e36 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -14,9 +14,11 @@ import mock import unittest +import os from airflow import configuration from airflow.utils.log.s3_task_handler import S3TaskHandler +from airflow.utils.state import State from airflow.utils.timezone import datetime from airflow.hooks.S3_hook import S3Hook from airflow.models import TaskInstance, DAG @@ -37,13 +39,14 @@ class TestS3TaskHandler(unittest.TestCase): def setUp(self): super(TestS3TaskHandler, self).setUp() - self.remote_log_location = 's3://bucket/remote/log/location' - self.remote_log_key = 'remote/log/location' + self.remote_log_base = 's3://bucket/remote/log/location' + self.remote_log_location = 's3://bucket/remote/log/location/1.log' + self.remote_log_key = 'remote/log/location/1.log' self.local_log_location = 'local/log/location' self.filename_template = '{try_number}.log' self.s3_task_handler = S3TaskHandler( self.local_log_location, - self.remote_log_location, + self.remote_log_base, self.filename_template ) @@ -53,6 +56,7 @@ class TestS3TaskHandler(unittest.TestCase): task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag) self.ti = TaskInstance(task=task, execution_date=date) self.ti.try_number = 1 + self.ti.state = State.RUNNING self.addCleanup(self.dag.clear) self.conn = boto3.client('s3') @@ -61,6 +65,13 @@ class TestS3TaskHandler(unittest.TestCase): moto.core.moto_api_backend.reset() self.conn.create_bucket(Bucket="bucket") + def tearDown(self): + if self.s3_task_handler.handler: + try: + os.remove(self.s3_task_handler.handler.baseFilename) + except Exception: + pass + def test_hook(self): self.assertIsInstance(self.s3_task_handler.hook, S3Hook) @@ -94,10 +105,11 @@ class TestS3TaskHandler(unittest.TestCase): self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) def test_read(self): - self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n') + self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n') self.assertEqual( self.s3_task_handler.read(self.ti), - ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n'] + ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\n' + 'Log line\n\n'] ) def test_read_raises_return_error(self): @@ -131,4 +143,22 @@ class TestS3TaskHandler(unittest.TestCase): with mock.patch.object(handler.log, 'error') as mock_error: handler.s3_write('text', url) self.assertEqual - mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True) + mock_error.assert_called_once_with( + 'Could not write logs to %s', url, exc_info=True) + + def test_close(self): + self.s3_task_handler.set_context(self.ti) + self.assertTrue(self.s3_task_handler.upload_on_close) + + self.s3_task_handler.close() + # Should not raise + boto3.resource('s3').Object('bucket', self.remote_log_key).get() + + def test_close_no_upload(self): + self.ti.is_raw = True + self.s3_task_handler.set_context(self.ti) + self.assertFalse(self.s3_task_handler.upload_on_close) + self.s3_task_handler.close() + + with self.assertRaises(self.conn.exceptions.NoSuchKey): + boto3.resource('s3').Object('bucket', self.remote_log_key).get()
