Repository: incubator-airflow Updated Branches: refs/heads/master a9ceca5e0 -> 06b41fbe1
[AIRFLOW-1869] Write more error messages into gcs and file logs Closes #2826 from wrp/gcs-log Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06b41fbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06b41fbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06b41fbe Branch: refs/heads/master Commit: 06b41fbe1bf94ca2013fe164ee275f9fbac92973 Parents: a9ceca5 Author: William Pursell <[email protected]> Authored: Tue Dec 5 11:24:35 2017 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Dec 5 11:24:35 2017 -0800 ---------------------------------------------------------------------- airflow/utils/log/file_task_handler.py | 10 +++--- airflow/utils/log/gcs_task_handler.py | 54 +++++++++-------------------- tests/www/test_views.py | 2 +- 3 files changed, 24 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/file_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6038fbf..f131c09 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -97,9 +97,11 @@ class FileTaskHandler(logging.Handler): if os.path.exists(location): try: with open(location) as f: - log += "*** Reading local log.\n" + "".join(f.readlines()) + log += "*** Reading local file: {}\n".format(location) + log += "".join(f.readlines()) except Exception as e: - log = "*** Failed to load local log file: {}. {}\n".format(location, str(e)) + log = "*** Failed to load local log file: {}\n".format(location) + log += "*** {}\n".format(str(e)) else: url = os.path.join( "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path @@ -107,8 +109,8 @@ class FileTaskHandler(logging.Handler): ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT') ) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) + log += "*** Log file does not exist: {}\n".format(location) + log += "*** Fetching from: {}\n".format(url) try: timeout = None # No timeout try: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index c11e7ad..b556cf0 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -40,11 +40,11 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): return GoogleCloudStorageHook( google_cloud_storage_conn_id=remote_conn_id ) - except: + except Exception as e: self.log.error( 'Could not create a GoogleCloudStorageHook with connection id ' - '"%s". Please make sure that airflow[gcp_api] is installed ' - 'and the GCS connection exists.', remote_conn_id + '"{}". {}\n\nPlease make sure that airflow[gcp_api] is installed ' + 'and the GCS connection exists.'.format(remote_conn_id, str(e)) ) @property @@ -97,49 +97,26 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): log_relative_path = self._render_filename(ti, try_number + 1) remote_loc = os.path.join(self.remote_base, log_relative_path) - if self.gcs_log_exists(remote_loc): - # If GCS remote file exists, we do not fetch logs from task instance - # local machine even if there are errors reading remote logs, as - # remote_log will contain error message. - remote_log = self.gcs_read(remote_loc, return_error=True) + try: + remote_log = self.gcs_read(remote_loc) log = '*** Reading remote log from {}.\n{}\n'.format( remote_loc, remote_log) - else: - log = super(GCSTaskHandler, self)._read(ti, try_number) + except Exception as e: + log = '*** Unable to read remote log from {}\n*** {}\n\n'.format( + remote_loc, str(e)) + self.log.error(log) + log += super(GCSTaskHandler, self)._read(ti, try_number) return log - def gcs_log_exists(self, remote_log_location): - """ - Check if remote_log_location exists in remote storage - :param remote_log_location: log's location in remote storage - :return: True if location exists else False - """ - try: - bkt, blob = self.parse_gcs_url(remote_log_location) - return self.hook.exists(bkt, blob) - except Exception: - pass - return False - - def gcs_read(self, remote_log_location, return_error=False): + def gcs_read(self, remote_log_location): """ Returns the log found at the remote_log_location. :param remote_log_location: the log's location in remote storage :type remote_log_location: string (path) - :param return_error: if True, returns a string error message if an - error occurs. Otherwise returns '' when an error occurs. - :type return_error: bool """ - try: - bkt, blob = self.parse_gcs_url(remote_log_location) - return self.hook.download(bkt, blob).decode() - except: - # return error if needed - if return_error: - msg = 'Could not read logs from {}'.format(remote_log_location) - self.log.error(msg) - return msg + bkt, blob = self.parse_gcs_url(remote_log_location) + return self.hook.download(bkt, blob).decode() def gcs_write(self, log, remote_log_location, append=True): """ @@ -154,7 +131,10 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): :type append: bool """ if append: - old_log = self.gcs_read(remote_log_location) + try: + old_log = self.gcs_read(remote_log_location) + except Exception as e: + old_log = '*** Previous log discarded: {}\n\n'.format(str(e)) log = '\n'.join([old_log, log]) if old_log else log try: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/tests/www/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www/test_views.py b/tests/www/test_views.py index f5b015e..0051848 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -374,7 +374,7 @@ class TestLogView(unittest.TestCase): follow_redirects=True, ) self.assertEqual(response.status_code, 200) - self.assertIn('Log file isn', + self.assertIn('Log file does not exist', response.data.decode('utf-8'))
