Repository: incubator-airflow Updated Branches: refs/heads/master bc25d593c -> 96206b0e5
[AIRFLOW-1676] Make GCSTaskHandler write to GCS on close Closes #2659 from criccomini/AIRFLOW-1676 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96206b0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96206b0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96206b0e Branch: refs/heads/master Commit: 96206b0e5886945bc7ac5436bc7139daf14570d9 Parents: bc25d59 Author: Chris Riccomini <[email protected]> Authored: Wed Oct 4 11:28:33 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Oct 4 11:28:33 2017 +0200 ---------------------------------------------------------------------- airflow/utils/log/gcs_task_handler.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96206b0e/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index dcdaf6d..bb40e11 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -31,6 +31,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): self.remote_base = gcs_log_folder self.log_relative_path = '' self._hook = None + self.closed = False def _build_hook(self): remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') @@ -67,7 +68,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): # calling close method. Here we check if logger is already # closed to prevent uploading the log to remote storage multiple # times when `logging.shutdown` is called. - if self._hook is None: + if self.closed: return super(GCSTaskHandler, self).close() @@ -80,8 +81,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): log = logfile.read() self.gcs_write(log, remote_loc) - # Unset variable - self._hook = None + # Mark closed so we don't double write if close is called twice + self.closed = True def _read(self, ti, try_number): """ @@ -153,8 +154,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): :type append: bool """ if append: - old_log = self.read(remote_log_location) - log = '\n'.join([old_log, log]) + old_log = self.gcs_read(remote_log_location) + log = '\n'.join([old_log, log]) if old_log else log try: bkt, blob = self.parse_gcs_url(remote_log_location)
