This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a79e2d4  Move provider's log task handlers to the provider package 
(#9604)
a79e2d4 is described below

commit a79e2d4c4aa105f3fac5ae6a28e29af9cd572407
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jul 6 08:05:40 2020 +0100

    Move provider's log task handlers to the provider package (#9604)
---
 UPDATING.md                                        |  14 +-
 airflow/config_templates/airflow_local_settings.py |   4 +-
 .../amazon/aws}/log/cloudwatch_task_handler.py     |   0
 airflow/providers/elasticsearch/log/__init__.py    |  16 ++
 .../elasticsearch}/log/es_task_handler.py          |   5 +-
 airflow/providers/google/cloud/log/__init__.py     |  16 ++
 .../google/cloud}/log/stackdriver_task_handler.py  |   6 +-
 airflow/utils/log/cloudwatch_task_handler.py       | 110 +-------
 airflow/utils/log/es_task_handler.py               | 287 +--------------------
 airflow/utils/log/stackdriver_task_handler.py      | 283 +-------------------
 docs/autoapi_templates/index.rst                   |   2 +
 tests/deprecated_classes.py                        |  21 +-
 .../aws}/log/test_cloudwatch_task_handler.py       |   0
 tests/providers/elasticsearch/log/__init__.py      |  16 ++
 .../elasticsearch}/log/elasticmock/__init__.py     |   0
 .../log/elasticmock/fake_elasticsearch.py          |   0
 .../log/elasticmock/utilities/__init__.py          |   0
 .../elasticsearch}/log/test_es_task_handler.py     |   2 +-
 tests/providers/google/cloud/log/__init__.py       |  16 ++
 .../cloud}/log/test_stackdriver_task_handler.py    |  42 +--
 20 files changed, 161 insertions(+), 679 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 5945f7a..8d6f624 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -61,9 +61,21 @@ More tips can be found in the guide:
 https://developers.google.com/style/inclusive-documentation
 
 -->
+### StackdriverTaskHandler has been moved
+The `StackdriverTaskHandler` class from 
`airflow.utils.log.stackdriver_task_handler` has been moved to
+`airflow.providers.google.cloud.log.stackdriver_task_handler`. This is because 
it has items specific to `google cloud`.
+
 ### S3TaskHandler has been moved
 The `S3TaskHandler` class from `airflow.utils.log.s3_task_handler` has been 
moved to
-`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has 
items specific to `aws`
+`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has 
items specific to `aws`.
+
+### ElasticsearchTaskHandler has been moved
+The `ElasticsearchTaskHandler` class from `airflow.utils.log.es_task_handler` 
has been moved to
+`airflow.providers.elasticsearch.log.es_task_handler`. This is because it has 
items specific to `elasticsearch`.
+
+### CloudwatchTaskHandler has been  moved
+The `CloudwatchTaskHandler` class from 
`airflow.utils.log.cloudwatch_task_handler` has been moved to
+`airflow.providers.amazon.aws.log.cloudwatch_task_handler`. This is because it 
has items specific to `aws`.
 
 ### SendGrid emailer has been moved
 Formerly the core code was maintained by the original creators - Airbnb. The 
code that was in the contrib
diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 0e92c84..f927a4a 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -223,7 +223,7 @@ if REMOTE_LOGGING:
         log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
         STACKDRIVER_REMOTE_HANDLERS = {
             'task': {
-                'class': 
'airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler',
+                'class': 
'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler',
                 'formatter': 'airflow',
                 'name': log_name,
                 'gcp_key_path': key_path
@@ -241,7 +241,7 @@ if REMOTE_LOGGING:
 
         ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
             'task': {
-                'class': 
'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
+                'class': 
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
                 'formatter': 'airflow',
                 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
                 'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
diff --git a/airflow/utils/log/cloudwatch_task_handler.py 
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
similarity index 100%
copy from airflow/utils/log/cloudwatch_task_handler.py
copy to airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
diff --git a/airflow/providers/elasticsearch/log/__init__.py 
b/airflow/providers/elasticsearch/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/elasticsearch/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
similarity index 98%
copy from airflow/utils/log/es_task_handler.py
copy to airflow/providers/elasticsearch/log/es_task_handler.py
index b6e1687..4ff7a88 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -31,10 +31,10 @@ from airflow.utils import timezone
 from airflow.utils.helpers import parse_template_string
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin, 
ExternalLoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
     """
     ElasticsearchTaskHandler is a python log handler that
     reads logs from Elasticsearch. Note logs are not directly
@@ -270,6 +270,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
LoggingMixin, ExternalLoggingMix
 
     @property
     def log_name(self):
+        """ The log name"""
         return self.LOG_NAME
 
     def get_external_log_url(self, task_instance: TaskInstance, try_number: 
int) -> str:
diff --git a/airflow/providers/google/cloud/log/__init__.py 
b/airflow/providers/google/cloud/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/google/cloud/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/utils/log/stackdriver_task_handler.py 
b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
similarity index 98%
copy from airflow/utils/log/stackdriver_task_handler.py
copy to airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 4865fb3..87ed20e 100644
--- a/airflow/utils/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -61,7 +61,7 @@ class StackdriverTaskHandler(logging.Handler):
     :type scopes: Sequence[str]
     :param name: the name of the custom log in Stackdriver Logging. Defaults
         to 'airflow'. The name of the Python logger will be represented
-         in the ``python_logger`` field.
+        in the ``python_logger`` field.
     :type name: str
     :param transport: Class for creating new transport objects. It should
         extend from the base :class:`google.cloud.logging.handlers.Transport` 
type and
@@ -144,7 +144,7 @@ class StackdriverTaskHandler(logging.Handler):
         Configures the logger to add information with information about the 
current task
 
         :param task_instance: Currently executed task
-        :type task_instance: TaskInstance
+        :type task_instance:  :class:`airflow.models.TaskInstance`
         """
         self.task_instance_labels = 
self._task_instance_to_labels(task_instance)
 
@@ -155,7 +155,7 @@ class StackdriverTaskHandler(logging.Handler):
         Read logs of given task instance from Stackdriver logging.
 
         :param task_instance: task instance object
-        :type: task_instance: TaskInstance
+        :type task_instance: :class:`airflow.models.TaskInstance`
         :param try_number: task instance try_number to read logs from. If None
            it returns all logs
         :type try_number: Optional[int]
diff --git a/airflow/utils/log/cloudwatch_task_handler.py 
b/airflow/utils/log/cloudwatch_task_handler.py
index 2d22452..f468a89 100644
--- a/airflow/utils/log/cloudwatch_task_handler.py
+++ b/airflow/utils/log/cloudwatch_task_handler.py
@@ -15,101 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-import watchtower
-from cached_property import cached_property
-
-from airflow.configuration import conf
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-
-class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
-    """
-    CloudwatchTaskHandler is a python log handler that handles and reads task 
instance logs.
-
-    It extends airflow FileTaskHandler and uploads to and reads from 
Cloudwatch.
-
-    :param base_log_folder: base folder to store logs locally
-    :type base_log_folder: str
-    :param log_group_arn: ARN of the Cloudwatch log group for remote log 
storage
-        with format ``arn:aws:logs:{region name}:{account id}:log-group:{group 
name}``
-    :type log_group_arn: str
-    :param filename_template: template for file name (local storage) or log 
stream name (remote)
-    :type filename_template: str
-    """
-    def __init__(self, base_log_folder, log_group_arn, filename_template):
-        super().__init__(base_log_folder, filename_template)
-        split_arn = log_group_arn.split(':')
-
-        self.handler = None
-        self.log_group = split_arn[6]
-        self.region_name = split_arn[3]
-        self.closed = False
-
-    @cached_property
-    def hook(self):
-        """
-        Returns AwsLogsHook.
-        """
-        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
-        try:
-            from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-            return AwsLogsHook(aws_conn_id=remote_conn_id, 
region_name=self.region_name)
-        except Exception:  # pylint: disable=broad-except
-            self.log.error(
-                'Could not create an AwsLogsHook with connection id "%s". '
-                'Please make sure that airflow[aws] is installed and '
-                'the Cloudwatch logs connection exists.', remote_conn_id
-            )
-
-    def _render_filename(self, ti, try_number):
-        # Replace unsupported log group name characters
-        return super()._render_filename(ti, try_number).replace(':', '_')
-
-    def set_context(self, ti):
-        super().set_context(ti)
-        self.handler = watchtower.CloudWatchLogHandler(
-            log_group=self.log_group,
-            stream_name=self._render_filename(ti, ti.try_number),
-            boto3_session=self.hook.get_session(self.region_name)
-        )
-
-    def close(self):
-        """
-        Close the handler responsible for the upload of the local log file to 
Cloudwatch.
-        """
-        # When application exit, system shuts down all handlers by
-        # calling close method. Here we check if logger is already
-        # closed to prevent uploading the log to remote storage multiple
-        # times when `logging.shutdown` is called.
-        if self.closed:
-            return
-
-        if self.handler is not None:
-            self.handler.close()
-        # Mark closed so we don't double write if close is called twice
-        self.closed = True
-
-    def _read(self, task_instance, try_number, metadata=None):
-        stream_name = self._render_filename(task_instance, try_number)
-        return '*** Reading remote log from Cloudwatch log_group: {} 
log_stream: {}.\n{}\n'.format(
-            self.log_group, stream_name, 
self.get_cloudwatch_logs(stream_name=stream_name)
-        ), {'end_of_log': True}
-
-    def get_cloudwatch_logs(self, stream_name):
-        """
-        Return all logs from the given log stream.
-
-        :param stream_name: name of the Cloudwatch log stream to get all logs 
from
-        :return: string of all logs from the given log stream
-        """
-        try:
-            events = list(self.hook.get_log_events(log_group=self.log_group, 
log_stream_name=stream_name))
-            return '\n'.join(reversed([event['message'] for event in events]))
-        except Exception:  # pylint: disable=broad-except
-            msg = 'Could not read remote logs from log_group: {} log_stream: 
{}.'.format(
-                self.log_group, stream_name
-            )
-            self.log.exception(msg)
-            return msg
+"""
+This module is deprecated. Please use 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.
+"""
+import warnings
+
+# pylint: disable=unused-import
+from airflow.providers.amazon.aws.log.cloudwatch_task_handler import 
CloudwatchTaskHandler  # noqa
+
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.",
+    DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/utils/log/es_task_handler.py
index b6e1687..2a0a19f 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -15,278 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-import logging
-import sys
-from urllib.parse import quote
-
-# Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
-import elasticsearch
-import pendulum
-from elasticsearch_dsl import Search
-
-from airflow.configuration import conf
-from airflow.models import TaskInstance
-from airflow.utils import timezone
-from airflow.utils.helpers import parse_template_string
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
-
-
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin, 
ExternalLoggingMixin):
-    """
-    ElasticsearchTaskHandler is a python log handler that
-    reads logs from Elasticsearch. Note logs are not directly
-    indexed into Elasticsearch. Instead, it flushes logs
-    into local files. Additional software setup is required
-    to index the log into Elasticsearch, such as using
-    Filebeat and Logstash.
-    To efficiently query and sort Elasticsearch results, we assume each
-    log message has a field `log_id` consists of ti primary keys:
-    `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}`
-    Log messages with specific log_id are sorted based on `offset`,
-    which is a unique integer indicates log message's order.
-    Timestamp here are unreliable because multiple log messages
-    might have the same timestamp.
-    """
-
-    PAGE = 0
-    MAX_LINE_PER_PAGE = 1000
-    LOG_NAME = 'Elasticsearch'
-
-    def __init__(self, base_log_folder, filename_template,  # pylint: 
disable=too-many-arguments
-                 log_id_template, end_of_log_mark,
-                 write_stdout, json_format, json_fields,
-                 host='localhost:9200',
-                 frontend='localhost:5601',
-                 es_kwargs=conf.getsection("elasticsearch_configs")):
-        """
-        :param base_log_folder: base folder to store logs locally
-        :param log_id_template: log id template
-        :param host: Elasticsearch host name
-        """
-        es_kwargs = es_kwargs or {}
-        super().__init__(
-            base_log_folder, filename_template)
-        self.closed = False
-
-        self.log_id_template, self.log_id_jinja_template = \
-            parse_template_string(log_id_template)
-
-        self.client = elasticsearch.Elasticsearch([host], **es_kwargs)
-
-        self.frontend = frontend
-        self.mark_end_on_close = True
-        self.end_of_log_mark = end_of_log_mark
-        self.write_stdout = write_stdout
-        self.json_format = json_format
-        self.json_fields = [label.strip() for label in json_fields.split(",")]
-        self.handler = None
-        self.context_set = False
-
-    def _render_log_id(self, ti, try_number):
-        if self.log_id_jinja_template:
-            jinja_context = ti.get_template_context()
-            jinja_context['try_number'] = try_number
-            return self.log_id_jinja_template.render(**jinja_context)
-
-        if self.json_format:
-            execution_date = self._clean_execution_date(ti.execution_date)
-        else:
-            execution_date = ti.execution_date.isoformat()
-        return self.log_id_template.format(dag_id=ti.dag_id,
-                                           task_id=ti.task_id,
-                                           execution_date=execution_date,
-                                           try_number=try_number)
-
-    @staticmethod
-    def _clean_execution_date(execution_date):
-        """
-        Clean up an execution date so that it is safe to query in elasticsearch
-        by removing reserved characters.
-        # 
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
-
-        :param execution_date: execution date of the dag run.
-        """
-        return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")
-
-    def _read(self, ti, try_number, metadata=None):
-        """
-        Endpoint for streaming log.
-
-        :param ti: task instance object
-        :param try_number: try_number of the task instance
-        :param metadata: log metadata,
-                         can be used for steaming log reading and auto-tailing.
-        :return: a list of log documents and metadata.
-        """
-        if not metadata:
-            metadata = {'offset': 0}
-        if 'offset' not in metadata:
-            metadata['offset'] = 0
-
-        offset = metadata['offset']
-        log_id = self._render_log_id(ti, try_number)
-
-        logs = self.es_read(log_id, offset, metadata)
-
-        next_offset = offset if not logs else logs[-1].offset
-
-        # Ensure a string here. Large offset numbers will get JSON.parsed 
incorrectly
-        # on the client. Sending as a string prevents this issue.
-        # 
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
-        metadata['offset'] = str(next_offset)
-
-        # end_of_log_mark may contain characters like '\n' which is needed to
-        # have the log uploaded but will not be stored in elasticsearch.
-        metadata['end_of_log'] = False if not logs \
-            else logs[-1].message == self.end_of_log_mark.strip()
-
-        cur_ts = pendulum.now()
-        # Assume end of log after not receiving new log for 5 min,
-        # as executor heartbeat is 1 min and there might be some
-        # delay before Elasticsearch makes the log available.
-        if 'last_log_timestamp' in metadata:
-            last_log_ts = timezone.parse(metadata['last_log_timestamp'])
-            if cur_ts.diff(last_log_ts).in_minutes() >= 5 or 'max_offset' in 
metadata \
-                    and offset >= metadata['max_offset']:
-                metadata['end_of_log'] = True
-
-        if offset != next_offset or 'last_log_timestamp' not in metadata:
-            metadata['last_log_timestamp'] = str(cur_ts)
-
-        # If we hit the end of the log, remove the actual end_of_log message
-        # to prevent it from showing in the UI.
-        i = len(logs) if not metadata['end_of_log'] else len(logs) - 1
-        message = '\n'.join([log.message for log in logs[0:i]])
-
-        return message, metadata
-
-    def es_read(self, log_id, offset, metadata):
-        """
-        Returns the logs matching log_id in Elasticsearch and next offset.
-        Returns '' if no log is found or there was an error.
-
-        :param log_id: the log_id of the log to read.
-        :type log_id: str
-        :param offset: the offset start to read log from.
-        :type offset: str
-        :param metadata: log metadata, used for steaming log download.
-        :type metadata: dict
-        """
-
-        # Offset is the unique key for sorting logs given log_id.
-        search = Search(using=self.client) \
-            .query('match_phrase', log_id=log_id) \
-            .sort('offset')
-
-        search = search.filter('range', offset={'gt': int(offset)})
-        max_log_line = search.count()
-        if 'download_logs' in metadata and metadata['download_logs'] and 
'max_offset' not in metadata:
-            try:
-                if max_log_line > 0:
-                    metadata['max_offset'] = search[max_log_line - 
1].execute()[-1].offset
-                else:
-                    metadata['max_offset'] = 0
-            except Exception:  # pylint: disable=broad-except
-                self.log.exception('Could not get current log size with 
log_id: %s', log_id)
-
-        logs = []
-        if max_log_line != 0:
-            try:
-
-                logs = search[self.MAX_LINE_PER_PAGE * 
self.PAGE:self.MAX_LINE_PER_PAGE] \
-                    .execute()
-            except Exception as e:  # pylint: disable=broad-except
-                self.log.exception('Could not read log with log_id: %s, error: 
%s', log_id, str(e))
-
-        return logs
-
-    def set_context(self, ti):
-        """
-        Provide task_instance context to airflow task handler.
-
-        :param ti: task instance object
-        """
-        self.mark_end_on_close = not ti.raw
-
-        if self.json_format:
-            self.formatter = JSONFormatter(
-                json_fields=self.json_fields,
-                extras={
-                    'dag_id': str(ti.dag_id),
-                    'task_id': str(ti.task_id),
-                    'execution_date': 
self._clean_execution_date(ti.execution_date),
-                    'try_number': str(ti.try_number)
-                })
-
-        if self.write_stdout:
-            if self.context_set:
-                # We don't want to re-set up the handler if this logger has
-                # already been initialized
-                return
-
-            self.handler = logging.StreamHandler(stream=sys.__stdout__)
-            self.handler.setLevel(self.level)
-            self.handler.setFormatter(self.formatter)
-        else:
-            super().set_context(ti)
-        self.context_set = True
-
-    def close(self):
-        # When application exit, system shuts down all handlers by
-        # calling close method. Here we check if logger is already
-        # closed to prevent uploading the log to remote storage multiple
-        # times when `logging.shutdown` is called.
-        if self.closed:
-            return
-
-        if not self.mark_end_on_close:
-            self.closed = True
-            return
-
-        # Case which context of the handler was not set.
-        if self.handler is None:
-            self.closed = True
-            return
-
-        # Reopen the file stream, because FileHandler.close() would be called
-        # first in logging.shutdown() and the stream in it would be set to 
None.
-        if self.handler.stream is None or self.handler.stream.closed:
-            self.handler.stream = self.handler._open()  # pylint: 
disable=protected-access
-
-        # Mark the end of file using end of log mark,
-        # so we know where to stop while auto-tailing.
-        self.handler.stream.write(self.end_of_log_mark)
-
-        if self.write_stdout:
-            self.handler.close()
-            sys.stdout = sys.__stdout__
-
-        super().close()
-
-        self.closed = True
-
-    @property
-    def log_name(self):
-        return self.LOG_NAME
-
-    def get_external_log_url(self, task_instance: TaskInstance, try_number: 
int) -> str:
-        """
-        Creates an address for an external log collecting service.
-
-        :param task_instance: task instance object
-        :type: task_instance: TaskInstance
-        :param try_number: task instance try_number to read logs from.
-        :type try_number: Optional[int]
-        :return: URL to the external log collection service
-        :rtype: str
-        """
-        log_id = self.log_id_template.format(
-            dag_id=task_instance.dag_id,
-            task_id=task_instance.task_id,
-            execution_date=task_instance.execution_date,
-            try_number=try_number)
-        url = 'https://' + self.frontend.format(log_id=quote(log_id))
-        return url
+"""
+This module is deprecated. Please use 
`airflow.providers.elasticsearch.log.es_task_handler`.
+"""
+import warnings
+
+# pylint: disable=unused-import
+from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler  # noqa
+
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.providers.elasticsearch.log.es_task_handler`.",
+    DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/utils/log/stackdriver_task_handler.py 
b/airflow/utils/log/stackdriver_task_handler.py
index 4865fb3..903a64f 100644
--- a/airflow/utils/log/stackdriver_task_handler.py
+++ b/airflow/utils/log/stackdriver_task_handler.py
@@ -15,281 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Handler that integrates with Stackdriver
+This module is deprecated. Please use 
`airflow.providers.google.cloud.log.stackdriver_task_handler`.
 """
-import logging
-from typing import Collection, Dict, List, Optional, Tuple, Type
+import warnings
 
-from cached_property import cached_property
-from google.api_core.gapic_v1.client_info import ClientInfo
-from google.cloud import logging as gcp_logging
-from google.cloud.logging.handlers.transports import 
BackgroundThreadTransport, Transport
-from google.cloud.logging.resource import Resource
+# pylint: disable=unused-import
+from airflow.providers.google.cloud.log.stackdriver_task_handler import 
StackdriverTaskHandler  # noqa
 
-from airflow import version
-from airflow.models import TaskInstance
-from airflow.providers.google.cloud.utils.credentials_provider import 
get_credentials_and_project_id
-
-DEFAULT_LOGGER_NAME = "airflow"
-_GLOBAL_RESOURCE = Resource(type="global", labels={})
-
-_DEFAULT_SCOPESS = frozenset([
-    "https://www.googleapis.com/auth/logging.read";,
-    "https://www.googleapis.com/auth/logging.write";
-])
-
-
-class StackdriverTaskHandler(logging.Handler):
-    """Handler that directly makes Stackdriver logging API calls.
-
-    This is a Python standard ``logging`` handler using that can be used to
-    route Python standard logging messages directly to the Stackdriver
-    Logging API.
-
-    It can also be used to save logs for executing tasks. To do this, you 
should set as a handler with
-    the name "tasks". In this case, it will also be used to read the log for 
display in Web UI.
-
-    This handler supports both an asynchronous and synchronous transport.
-
-
-    :param gcp_key_path: Path to GCP Credential JSON file.
-        If ommited, authorization based on `the Application Default Credentials
-        
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
 will
-        be used.
-    :type gcp_key_path: str
-    :param scopes: OAuth scopes for the credentials,
-    :type scopes: Sequence[str]
-    :param name: the name of the custom log in Stackdriver Logging. Defaults
-        to 'airflow'. The name of the Python logger will be represented
-         in the ``python_logger`` field.
-    :type name: str
-    :param transport: Class for creating new transport objects. It should
-        extend from the base :class:`google.cloud.logging.handlers.Transport` 
type and
-        implement :meth`google.cloud.logging.handlers.Transport.send`. 
Defaults to
-        :class:`google.cloud.logging.handlers.BackgroundThreadTransport`. The 
other
-        option is :class:`google.cloud.logging.handlers.SyncTransport`.
-    :type transport: :class:`type`
-    :param resource: (Optional) Monitored resource of the entry, defaults
-                     to the global resource type.
-    :type resource: :class:`~google.cloud.logging.resource.Resource`
-    :param labels: (Optional) Mapping of labels for the entry.
-    :type labels: dict
-    """
-
-    LABEL_TASK_ID = "task_id"
-    LABEL_DAG_ID = "dag_id"
-    LABEL_EXECUTION_DATE = "execution_date"
-    LABEL_TRY_NUMBER = "try_number"
-
-    def __init__(
-        self,
-        gcp_key_path: Optional[str] = None,
-        # See: https://github.com/PyCQA/pylint/issues/2377
-        scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,  # pylint: 
disable=unsubscriptable-object
-        name: str = DEFAULT_LOGGER_NAME,
-        transport: Type[Transport] = BackgroundThreadTransport,
-        resource: Resource = _GLOBAL_RESOURCE,
-        labels: Optional[Dict[str, str]] = None,
-    ):
-        super().__init__()
-        self.gcp_key_path: Optional[str] = gcp_key_path
-        # See: https://github.com/PyCQA/pylint/issues/2377
-        self.scopes: Optional[Collection[str]] = scopes  # pylint: 
disable=unsubscriptable-object
-        self.name: str = name
-        self.transport_type: Type[Transport] = transport
-        self.resource: Resource = resource
-        self.labels: Optional[Dict[str, str]] = labels
-        self.task_instance_labels: Optional[Dict[str, str]] = {}
-
-    @cached_property
-    def _client(self) -> gcp_logging.Client:
-        """Google Cloud Library API client"""
-        credentials, _ = get_credentials_and_project_id(
-            key_path=self.gcp_key_path,
-            scopes=self.scopes,
-        )
-        client = gcp_logging.Client(
-            credentials=credentials,
-            client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
-        )
-        return client
-
-    @cached_property
-    def _transport(self) -> Transport:
-        """Object responsible for sending data to Stackdriver"""
-        return self.transport_type(self._client, self.name)
-
-    def emit(self, record: logging.LogRecord) -> None:
-        """Actually log the specified logging record.
-
-        :param record: The record to be logged.
-        :type record: logging.LogRecord
-        """
-        message = self.format(record)
-        labels: Optional[Dict[str, str]]
-        if self.labels and self.task_instance_labels:
-            labels = {}
-            labels.update(self.labels)
-            labels.update(self.task_instance_labels)
-        elif self.labels:
-            labels = self.labels
-        elif self.task_instance_labels:
-            labels = self.task_instance_labels
-        else:
-            labels = None
-        self._transport.send(record, message, resource=self.resource, 
labels=labels)
-
-    def set_context(self, task_instance: TaskInstance) -> None:
-        """
-        Configures the logger to add information with information about the 
current task
-
-        :param task_instance: Currently executed task
-        :type task_instance: TaskInstance
-        """
-        self.task_instance_labels = 
self._task_instance_to_labels(task_instance)
-
-    def read(
-        self, task_instance: TaskInstance, try_number: Optional[int] = None, 
metadata: Optional[Dict] = None
-    ) -> Tuple[List[str], List[Dict]]:
-        """
-        Read logs of given task instance from Stackdriver logging.
-
-        :param task_instance: task instance object
-        :type: task_instance: TaskInstance
-        :param try_number: task instance try_number to read logs from. If None
-           it returns all logs
-        :type try_number: Optional[int]
-        :param metadata: log metadata. It is used for steaming log reading and 
auto-tailing.
-        :type metadata: Dict
-        :return: a tuple of list of logs and list of metadata
-        :rtype: Tuple[List[str], List[Dict]]
-        """
-        if try_number is not None and try_number < 1:
-            logs = ["Error fetching the logs. Try number {} is 
invalid.".format(try_number)]
-            return logs, [{"end_of_log": "true"}]
-
-        if not metadata:
-            metadata = {}
-
-        ti_labels = self._task_instance_to_labels(task_instance)
-
-        if try_number is not None:
-            ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
-        else:
-            del ti_labels[self.LABEL_TRY_NUMBER]
-
-        log_filter = self._prepare_log_filter(ti_labels)
-        next_page_token = metadata.get("next_page_token", None)
-        all_pages = 'download_logs' in metadata and metadata['download_logs']
-
-        messages, end_of_log, next_page_token = self._read_logs(log_filter, 
next_page_token, all_pages)
-
-        new_metadata = {"end_of_log": end_of_log}
-
-        if next_page_token:
-            new_metadata['next_page_token'] = next_page_token
-
-        return [messages], [new_metadata]
-
-    def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str:
-        """
-        Prepares the filter that chooses which log entries to fetch.
-
-        More information:
-        
https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list#body.request_body.FIELDS.filter
-        https://cloud.google.com/logging/docs/view/advanced-queries
-
-        :param ti_labels: Task Instance's labels that will be used to search 
for logs
-        :type: Dict[str, str]
-        :return: logs filter
-        """
-        def escape_label_key(key: str) -> str:
-            return f'"{key}"' if "." in key else key
-
-        def escale_label_value(value: str) -> str:
-            escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
-            return f'"{escaped_value}"'
-
-        log_filters = [
-            f'resource.type={escale_label_value(self.resource.type)}',
-            f'logName="projects/{self._client.project}/logs/{self.name}"'
-        ]
-
-        for key, value in self.resource.labels.items():
-            
log_filters.append(f'resource.labels.{escape_label_key(key)}={escale_label_value(value)}')
-
-        for key, value in ti_labels.items():
-            
log_filters.append(f'labels.{escape_label_key(key)}={escale_label_value(value)}')
-        return "\n".join(log_filters)
-
-    def _read_logs(
-        self,
-        log_filter: str,
-        next_page_token: Optional[str],
-        all_pages: bool
-    ) -> Tuple[str, bool, Optional[str]]:
-        """
-        Sends requests to the Stackdriver service and downloads logs.
-
-        :param log_filter: Filter specifying the logs to be downloaded.
-        :type log_filter: str
-        :param next_page_token: The token of the page from which the log 
download will start.
-            If None is passed, it will start from the first page.
-        :param all_pages: If True is passed, all subpages will be downloaded. 
Otherwise, only the first
-            page will be downloaded
-        :return: A token that contains the following items:
-            * string with logs
-            * Boolean value describing whether there are more logs,
-            * token of the next page
-        :rtype: Tuple[str, bool, str]
-        """
-        messages = []
-        new_messages, next_page_token = self._read_single_logs_page(
-            log_filter=log_filter,
-            page_token=next_page_token,
-        )
-        messages.append(new_messages)
-        if all_pages:
-            while next_page_token:
-                new_messages, next_page_token = self._read_single_logs_page(
-                    log_filter=log_filter,
-                    page_token=next_page_token
-                )
-                messages.append(new_messages)
-
-            end_of_log = True
-            next_page_token = None
-        else:
-            end_of_log = not bool(next_page_token)
-        return "\n".join(messages), end_of_log, next_page_token
-
-    def _read_single_logs_page(self, log_filter: str, page_token: 
Optional[str] = None) -> Tuple[str, str]:
-        """
-        Sends requests to the Stackdriver service and downloads single pages 
with logs.
-
-        :param log_filter: Filter specifying the logs to be downloaded.
-        :type log_filter: str
-        :param page_token: The token of the page to be downloaded. If None is 
passed, the first page will be
-            downloaded.
-        :type page_token: str
-        :return: Downloaded logs and next page token
-        :rtype: Tuple[str, str]
-        """
-        entries = self._client.list_entries(filter_=log_filter, 
page_token=page_token)
-        page = next(entries.pages)
-        next_page_token = entries.next_page_token
-        messages = []
-        for entry in page:
-            if "message" in entry.payload:
-                messages.append(entry.payload["message"])
-
-        return "\n".join(messages), next_page_token
-
-    @classmethod
-    def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
-        return {
-            cls.LABEL_TASK_ID: ti.task_id,
-            cls.LABEL_DAG_ID: ti.dag_id,
-            cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()),
-            cls.LABEL_TRY_NUMBER: str(ti.try_number),
-        }
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.providers.google.cloud.log.stackdriver_task_handler`.",
+    DeprecationWarning, stacklevel=2
+)
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 1166643..0842716 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -421,3 +421,5 @@ All task log handlers are derived from 
:class:`~airflow.utils.log.file_task_hand
 
 
   airflow/providers/amazon/aws/log/index
+  airflow/providers/elasticsearch/log/index
+  airflow/providers/google/cloud/log/index
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 03303cb..4c23108 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1753,7 +1753,26 @@ UTILS = [
     )
 ]
 
-ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS + UTILS
+LOGS = [
+    (
+        "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
+        "airflow.utils.log.s3_task_handler.S3TaskHandler"
+    ),
+    (
+        
'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+        'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler'
+    ),
+    (
+        
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+        'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler'
+    ),
+    (
+        
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
+        "airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler"
+    )
+]
+
+ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS + UTILS + LOGS
 
 RENAMED_ALL = [
     (old_class, new_class)
diff --git a/tests/utils/log/test_cloudwatch_task_handler.py 
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
similarity index 100%
rename from tests/utils/log/test_cloudwatch_task_handler.py
rename to tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
diff --git a/tests/providers/elasticsearch/log/__init__.py 
b/tests/providers/elasticsearch/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/elasticsearch/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/utils/log/elasticmock/__init__.py 
b/tests/providers/elasticsearch/log/elasticmock/__init__.py
similarity index 100%
rename from tests/utils/log/elasticmock/__init__.py
rename to tests/providers/elasticsearch/log/elasticmock/__init__.py
diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py 
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
similarity index 100%
rename from tests/utils/log/elasticmock/fake_elasticsearch.py
rename to tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
diff --git a/tests/utils/log/elasticmock/utilities/__init__.py 
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
similarity index 100%
rename from tests/utils/log/elasticmock/utilities/__init__.py
rename to tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
diff --git a/tests/utils/log/test_es_task_handler.py 
b/tests/providers/elasticsearch/log/test_es_task_handler.py
similarity index 99%
rename from tests/utils/log/test_es_task_handler.py
rename to tests/providers/elasticsearch/log/test_es_task_handler.py
index 8453ca8..bb82886 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -30,8 +30,8 @@ from parameterized import parameterized
 from airflow.configuration import conf
 from airflow.models import DAG, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler
 from airflow.utils import timezone
-from airflow.utils.log.es_task_handler import ElasticsearchTaskHandler
 from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 
diff --git a/tests/providers/google/cloud/log/__init__.py 
b/tests/providers/google/cloud/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/google/cloud/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/utils/log/test_stackdriver_task_handler.py 
b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
similarity index 85%
rename from tests/utils/log/test_stackdriver_task_handler.py
rename to tests/providers/google/cloud/log/test_stackdriver_task_handler.py
index 9af7d2d..684a0a8 100644
--- a/tests/utils/log/test_stackdriver_task_handler.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
@@ -25,7 +25,7 @@ from google.cloud.logging.resource import Resource
 from airflow.models import TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.utils.log.stackdriver_task_handler import StackdriverTaskHandler
+from airflow.providers.google.cloud.log.stackdriver_task_handler import 
StackdriverTaskHandler
 from airflow.utils.state import State
 
 
@@ -36,8 +36,8 @@ def _create_list_response(messages, token):
 
 class TestStackdriverLoggingHandlerStandalone(unittest.TestCase):
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_pass_message_to_client(self, mock_client, 
mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
@@ -78,8 +78,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.ti.state = State.RUNNING
         self.addCleanup(self.dag.clear)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_set_labels(self, mock_client, 
mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
@@ -100,8 +100,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
             mock.ANY, 'test-message', labels=labels, resource=resource
         )
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_append_labels(self, mock_client, 
mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
         self.stackdriver_task_handler = StackdriverTaskHandler(
@@ -126,9 +126,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
             mock.ANY, 'test-message', labels=labels, resource=resource
         )
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch(
-        'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+        
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
         **{'return_value.project': 'asf-project'}  # type: ignore
     )
     def test_should_read_logs_for_all_try(self, mock_client, 
mock_get_creds_and_project_id):
@@ -147,9 +147,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['MSG1\nMSG2'], logs)
         self.assertEqual([{'end_of_log': True}], metadata)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch(  # type: ignore
-        'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+        
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
         **{'return_value.project': 'asf-project'}  # type: ignore
     )
     def test_should_read_logs_for_task_with_quote(self, mock_client, 
mock_get_creds_and_project_id):
@@ -168,9 +168,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['MSG1\nMSG2'], logs)
         self.assertEqual([{'end_of_log': True}], metadata)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch(
-        'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+        
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
         **{'return_value.project': 'asf-project'}  # type: ignore
     )
     def test_should_read_logs_for_single_try(self, mock_client, 
mock_get_creds_and_project_id):
@@ -190,8 +190,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['MSG1\nMSG2'], logs)
         self.assertEqual([{'end_of_log': True}], metadata)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_read_logs_with_pagination(self, mock_client, 
mock_get_creds_and_project_id):
         mock_client.return_value.list_entries.side_effect = [
             _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
@@ -213,8 +213,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['MSG3\nMSG4'], logs)
         self.assertEqual([{'end_of_log': True}], metadata2)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_read_logs_with_download(self, mock_client, 
mock_get_creds_and_project_id):
         mock_client.return_value.list_entries.side_effect = [
             _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
@@ -227,9 +227,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['MSG1\nMSG2\nMSG3\nMSG4'], logs)
         self.assertEqual([{'end_of_log': True}], metadata1)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch(
-        'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+        
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
         **{'return_value.project': 'asf-project'}  # type: ignore
     )
     def test_should_read_logs_with_custom_resources(self, mock_client, 
mock_get_creds_and_project_id):
@@ -267,8 +267,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.assertEqual(['TEXT\nTEXT'], logs)
         self.assertEqual([{'end_of_log': True}], metadata)
 
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-    
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+    
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_use_credentials(self, mock_client, 
mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 

Reply via email to