feluelle commented on a change in pull request #7437: [AIRFLOW-2325] Add 
CloudwatchTaskHandler option for remote task loggi…
URL: https://github.com/apache/airflow/pull/7437#discussion_r380592715
 
 

 ##########
 File path: airflow/utils/log/cloudwatch_task_handler.py
 ##########
 @@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import watchtower
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    CloudwatchTaskHandler is a python log handler that handles and reads task 
instance logs.
+
+    It extends airflow FileTaskHandler and uploads to and reads from 
Cloudwatch.
+    """
+    def __init__(self, base_log_folder, log_group, region_name, 
filename_template):
+        super().__init__(base_log_folder, filename_template)
+        self.handler = None
+        self.log_group = log_group
+        self.region_name = region_name
+        self.closed = False
+
+    @cached_property
+    def hook(self):
+        """
+        Returns AwsLogsHook.
+        """
+        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
+        try:
+            from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            return AwsLogsHook(aws_conn_id=remote_conn_id, 
region_name=self.region_name)
+        except Exception:  # pylint: disable=broad-except
+            self.log.error(
+                'Could not create an AwsLogsHook with connection id "%s". '
+                'Please make sure that airflow[aws] is installed and '
+                'the Cloudwatch logs connection exists.', remote_conn_id
+            )
+
+    def _render_filename(self, ti, try_number):
+        # Replace unsupported log group name characters
+        return super()._render_filename(ti, try_number).replace(':', '_')
+
+    def set_context(self, ti):
+        self.handler = watchtower.CloudWatchLogHandler(
+            log_group=self.log_group,
+            stream_name=self._render_filename(ti, ti.try_number),
+            boto3_session=self.hook.get_session(self.region_name)
+        )
+
+    def close(self):
+        """
+        Close the handler responsible for the upload of the local log file to 
Cloudwatch.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        if self.handler is not None:
+            self.handler.close()
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(self, task_instance, try_number, metadata=None):
+        stream_name = self._render_filename(task_instance, try_number)
+        return '*** Reading remote log from Cloudwatch log_group: {} 
log_stream: {}.\n{}\n'.format(
+            self.log_group, stream_name, 
self.get_cloudwatch_logs(stream_name=stream_name)
+        ), {'end_of_log': True}
+
+    def get_cloudwatch_logs(self, stream_name):
 
 Review comment:
   Same here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to