turbaszek commented on a change in pull request #9604:
URL: https://github.com/apache/airflow/pull/9604#discussion_r449391557



##########
File path: airflow/utils/log/cloudwatch_task_handler.py
##########
@@ -15,101 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-import watchtower
-from cached_property import cached_property
-
-from airflow.configuration import conf
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-
-class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
-    """
-    CloudwatchTaskHandler is a python log handler that handles and reads task 
instance logs.
-
-    It extends airflow FileTaskHandler and uploads to and reads from 
Cloudwatch.
-
-    :param base_log_folder: base folder to store logs locally
-    :type base_log_folder: str
-    :param log_group_arn: ARN of the Cloudwatch log group for remote log 
storage
-        with format ``arn:aws:logs:{region name}:{account id}:log-group:{group 
name}``
-    :type log_group_arn: str
-    :param filename_template: template for file name (local storage) or log 
stream name (remote)
-    :type filename_template: str
-    """
-    def __init__(self, base_log_folder, log_group_arn, filename_template):
-        super().__init__(base_log_folder, filename_template)
-        split_arn = log_group_arn.split(':')
-
-        self.handler = None
-        self.log_group = split_arn[6]
-        self.region_name = split_arn[3]
-        self.closed = False
-
-    @cached_property
-    def hook(self):
-        """
-        Returns AwsLogsHook.
-        """
-        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
-        try:
-            from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-            return AwsLogsHook(aws_conn_id=remote_conn_id, 
region_name=self.region_name)
-        except Exception:  # pylint: disable=broad-except
-            self.log.error(
-                'Could not create an AwsLogsHook with connection id "%s". '
-                'Please make sure that airflow[aws] is installed and '
-                'the Cloudwatch logs connection exists.', remote_conn_id
-            )
-
-    def _render_filename(self, ti, try_number):
-        # Replace unsupported log group name characters
-        return super()._render_filename(ti, try_number).replace(':', '_')
-
-    def set_context(self, ti):
-        super().set_context(ti)
-        self.handler = watchtower.CloudWatchLogHandler(
-            log_group=self.log_group,
-            stream_name=self._render_filename(ti, ti.try_number),
-            boto3_session=self.hook.get_session(self.region_name)
-        )
-
-    def close(self):
-        """
-        Close the handler responsible for the upload of the local log file to 
Cloudwatch.
-        """
-        # When application exit, system shuts down all handlers by
-        # calling close method. Here we check if logger is already
-        # closed to prevent uploading the log to remote storage multiple
-        # times when `logging.shutdown` is called.
-        if self.closed:
-            return
-
-        if self.handler is not None:
-            self.handler.close()
-        # Mark closed so we don't double write if close is called twice
-        self.closed = True
-
-    def _read(self, task_instance, try_number, metadata=None):
-        stream_name = self._render_filename(task_instance, try_number)
-        return '*** Reading remote log from Cloudwatch log_group: {} 
log_stream: {}.\n{}\n'.format(
-            self.log_group, stream_name, 
self.get_cloudwatch_logs(stream_name=stream_name)
-        ), {'end_of_log': True}
-
-    def get_cloudwatch_logs(self, stream_name):
-        """
-        Return all logs from the given log stream.
-
-        :param stream_name: name of the Cloudwatch log stream to get all logs 
from
-        :return: string of all logs from the given log stream
-        """
-        try:
-            events = list(self.hook.get_log_events(log_group=self.log_group, 
log_stream_name=stream_name))
-            return '\n'.join(reversed([event['message'] for event in events]))
-        except Exception:  # pylint: disable=broad-except
-            msg = 'Could not read remote logs from log_group: {} log_stream: 
{}.'.format(
-                self.log_group, stream_name
-            )
-            self.log.exception(msg)
-            return msg
+"""
+This module is deprecated. Please use 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.
+"""
+import warnings
+
+# pylint: disable=unused-import
+from airflow.providers.amazon.aws.log.cloudwatch_task_handler import 
CloudwatchTaskHandler  # noqa
+
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.",
+    DeprecationWarning, stacklevel=2
+)

Review comment:
       Would you mind adding a test in `test_core_to_contrib.py`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to