This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a79e2d4 Move provider's log task handlers to the provider package
(#9604)
a79e2d4 is described below
commit a79e2d4c4aa105f3fac5ae6a28e29af9cd572407
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jul 6 08:05:40 2020 +0100
Move provider's log task handlers to the provider package (#9604)
---
UPDATING.md | 14 +-
airflow/config_templates/airflow_local_settings.py | 4 +-
.../amazon/aws}/log/cloudwatch_task_handler.py | 0
airflow/providers/elasticsearch/log/__init__.py | 16 ++
.../elasticsearch}/log/es_task_handler.py | 5 +-
airflow/providers/google/cloud/log/__init__.py | 16 ++
.../google/cloud}/log/stackdriver_task_handler.py | 6 +-
airflow/utils/log/cloudwatch_task_handler.py | 110 +-------
airflow/utils/log/es_task_handler.py | 287 +--------------------
airflow/utils/log/stackdriver_task_handler.py | 283 +-------------------
docs/autoapi_templates/index.rst | 2 +
tests/deprecated_classes.py | 21 +-
.../aws}/log/test_cloudwatch_task_handler.py | 0
tests/providers/elasticsearch/log/__init__.py | 16 ++
.../elasticsearch}/log/elasticmock/__init__.py | 0
.../log/elasticmock/fake_elasticsearch.py | 0
.../log/elasticmock/utilities/__init__.py | 0
.../elasticsearch}/log/test_es_task_handler.py | 2 +-
tests/providers/google/cloud/log/__init__.py | 16 ++
.../cloud}/log/test_stackdriver_task_handler.py | 42 +--
20 files changed, 161 insertions(+), 679 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 5945f7a..8d6f624 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -61,9 +61,21 @@ More tips can be found in the guide:
https://developers.google.com/style/inclusive-documentation
-->
+### StackdriverTaskHandler has been moved
+The `StackdriverTaskHandler` class from
`airflow.utils.log.stackdriver_task_handler` has been moved to
+`airflow.providers.google.cloud.log.stackdriver_task_handler`. This is because
it has items specific to `google cloud`.
+
### S3TaskHandler has been moved
The `S3TaskHandler` class from `airflow.utils.log.s3_task_handler` has been
moved to
-`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has
items specific to `aws`
+`airflow.providers.amazon.aws.log.s3_task_handler`. This is because it has
items specific to `aws`.
+
+### ElasticsearchTaskHandler has been moved
+The `ElasticsearchTaskHandler` class from `airflow.utils.log.es_task_handler`
has been moved to
+`airflow.providers.elasticsearch.log.es_task_handler`. This is because it has
items specific to `elasticsearch`.
+
+### CloudwatchTaskHandler has been moved
+The `CloudwatchTaskHandler` class from
`airflow.utils.log.cloudwatch_task_handler` has been moved to
+`airflow.providers.amazon.aws.log.cloudwatch_task_handler`. This is because it
has items specific to `aws`.
### SendGrid emailer has been moved
Formerly the core code was maintained by the original creators - Airbnb. The
code that was in the contrib
diff --git a/airflow/config_templates/airflow_local_settings.py
b/airflow/config_templates/airflow_local_settings.py
index 0e92c84..f927a4a 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -223,7 +223,7 @@ if REMOTE_LOGGING:
log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
STACKDRIVER_REMOTE_HANDLERS = {
'task': {
- 'class':
'airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler',
+ 'class':
'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler',
'formatter': 'airflow',
'name': log_name,
'gcp_key_path': key_path
@@ -241,7 +241,7 @@ if REMOTE_LOGGING:
ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
'task': {
- 'class':
'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
+ 'class':
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
diff --git a/airflow/utils/log/cloudwatch_task_handler.py
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
similarity index 100%
copy from airflow/utils/log/cloudwatch_task_handler.py
copy to airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
diff --git a/airflow/providers/elasticsearch/log/__init__.py
b/airflow/providers/elasticsearch/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/elasticsearch/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/utils/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
similarity index 98%
copy from airflow/utils/log/es_task_handler.py
copy to airflow/providers/elasticsearch/log/es_task_handler.py
index b6e1687..4ff7a88 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -31,10 +31,10 @@ from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin,
ExternalLoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that
reads logs from Elasticsearch. Note logs are not directly
@@ -270,6 +270,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin, ExternalLoggingMix
@property
def log_name(self):
+ """ The log name"""
return self.LOG_NAME
def get_external_log_url(self, task_instance: TaskInstance, try_number:
int) -> str:
diff --git a/airflow/providers/google/cloud/log/__init__.py
b/airflow/providers/google/cloud/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/google/cloud/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/utils/log/stackdriver_task_handler.py
b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
similarity index 98%
copy from airflow/utils/log/stackdriver_task_handler.py
copy to airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 4865fb3..87ed20e 100644
--- a/airflow/utils/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -61,7 +61,7 @@ class StackdriverTaskHandler(logging.Handler):
:type scopes: Sequence[str]
:param name: the name of the custom log in Stackdriver Logging. Defaults
to 'airflow'. The name of the Python logger will be represented
- in the ``python_logger`` field.
+ in the ``python_logger`` field.
:type name: str
:param transport: Class for creating new transport objects. It should
extend from the base :class:`google.cloud.logging.handlers.Transport`
type and
@@ -144,7 +144,7 @@ class StackdriverTaskHandler(logging.Handler):
Configures the logger to add information with information about the
current task
:param task_instance: Currently executed task
- :type task_instance: TaskInstance
+ :type task_instance: :class:`airflow.models.TaskInstance`
"""
self.task_instance_labels =
self._task_instance_to_labels(task_instance)
@@ -155,7 +155,7 @@ class StackdriverTaskHandler(logging.Handler):
Read logs of given task instance from Stackdriver logging.
:param task_instance: task instance object
- :type: task_instance: TaskInstance
+ :type task_instance: :class:`airflow.models.TaskInstance`
:param try_number: task instance try_number to read logs from. If None
it returns all logs
:type try_number: Optional[int]
diff --git a/airflow/utils/log/cloudwatch_task_handler.py
b/airflow/utils/log/cloudwatch_task_handler.py
index 2d22452..f468a89 100644
--- a/airflow/utils/log/cloudwatch_task_handler.py
+++ b/airflow/utils/log/cloudwatch_task_handler.py
@@ -15,101 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-import watchtower
-from cached_property import cached_property
-
-from airflow.configuration import conf
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-
-class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
- """
- CloudwatchTaskHandler is a python log handler that handles and reads task
instance logs.
-
- It extends airflow FileTaskHandler and uploads to and reads from
Cloudwatch.
-
- :param base_log_folder: base folder to store logs locally
- :type base_log_folder: str
- :param log_group_arn: ARN of the Cloudwatch log group for remote log
storage
- with format ``arn:aws:logs:{region name}:{account id}:log-group:{group
name}``
- :type log_group_arn: str
- :param filename_template: template for file name (local storage) or log
stream name (remote)
- :type filename_template: str
- """
- def __init__(self, base_log_folder, log_group_arn, filename_template):
- super().__init__(base_log_folder, filename_template)
- split_arn = log_group_arn.split(':')
-
- self.handler = None
- self.log_group = split_arn[6]
- self.region_name = split_arn[3]
- self.closed = False
-
- @cached_property
- def hook(self):
- """
- Returns AwsLogsHook.
- """
- remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
- try:
- from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
- return AwsLogsHook(aws_conn_id=remote_conn_id,
region_name=self.region_name)
- except Exception: # pylint: disable=broad-except
- self.log.error(
- 'Could not create an AwsLogsHook with connection id "%s". '
- 'Please make sure that airflow[aws] is installed and '
- 'the Cloudwatch logs connection exists.', remote_conn_id
- )
-
- def _render_filename(self, ti, try_number):
- # Replace unsupported log group name characters
- return super()._render_filename(ti, try_number).replace(':', '_')
-
- def set_context(self, ti):
- super().set_context(ti)
- self.handler = watchtower.CloudWatchLogHandler(
- log_group=self.log_group,
- stream_name=self._render_filename(ti, ti.try_number),
- boto3_session=self.hook.get_session(self.region_name)
- )
-
- def close(self):
- """
- Close the handler responsible for the upload of the local log file to
Cloudwatch.
- """
- # When application exit, system shuts down all handlers by
- # calling close method. Here we check if logger is already
- # closed to prevent uploading the log to remote storage multiple
- # times when `logging.shutdown` is called.
- if self.closed:
- return
-
- if self.handler is not None:
- self.handler.close()
- # Mark closed so we don't double write if close is called twice
- self.closed = True
-
- def _read(self, task_instance, try_number, metadata=None):
- stream_name = self._render_filename(task_instance, try_number)
- return '*** Reading remote log from Cloudwatch log_group: {}
log_stream: {}.\n{}\n'.format(
- self.log_group, stream_name,
self.get_cloudwatch_logs(stream_name=stream_name)
- ), {'end_of_log': True}
-
- def get_cloudwatch_logs(self, stream_name):
- """
- Return all logs from the given log stream.
-
- :param stream_name: name of the Cloudwatch log stream to get all logs
from
- :return: string of all logs from the given log stream
- """
- try:
- events = list(self.hook.get_log_events(log_group=self.log_group,
log_stream_name=stream_name))
- return '\n'.join(reversed([event['message'] for event in events]))
- except Exception: # pylint: disable=broad-except
- msg = 'Could not read remote logs from log_group: {} log_stream:
{}.'.format(
- self.log_group, stream_name
- )
- self.log.exception(msg)
- return msg
+"""
+This module is deprecated. Please use
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.
+"""
+import warnings
+
+# pylint: disable=unused-import
+from airflow.providers.amazon.aws.log.cloudwatch_task_handler import
CloudwatchTaskHandler # noqa
+
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.providers.amazon.aws.log.cloudwatch_task_handler`.",
+ DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/utils/log/es_task_handler.py
b/airflow/utils/log/es_task_handler.py
index b6e1687..2a0a19f 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -15,278 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-import logging
-import sys
-from urllib.parse import quote
-
-# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
-import elasticsearch
-import pendulum
-from elasticsearch_dsl import Search
-
-from airflow.configuration import conf
-from airflow.models import TaskInstance
-from airflow.utils import timezone
-from airflow.utils.helpers import parse_template_string
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
-
-
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin,
ExternalLoggingMixin):
- """
- ElasticsearchTaskHandler is a python log handler that
- reads logs from Elasticsearch. Note logs are not directly
- indexed into Elasticsearch. Instead, it flushes logs
- into local files. Additional software setup is required
- to index the log into Elasticsearch, such as using
- Filebeat and Logstash.
- To efficiently query and sort Elasticsearch results, we assume each
- log message has a field `log_id` consists of ti primary keys:
- `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}`
- Log messages with specific log_id are sorted based on `offset`,
- which is a unique integer indicates log message's order.
- Timestamp here are unreliable because multiple log messages
- might have the same timestamp.
- """
-
- PAGE = 0
- MAX_LINE_PER_PAGE = 1000
- LOG_NAME = 'Elasticsearch'
-
- def __init__(self, base_log_folder, filename_template, # pylint:
disable=too-many-arguments
- log_id_template, end_of_log_mark,
- write_stdout, json_format, json_fields,
- host='localhost:9200',
- frontend='localhost:5601',
- es_kwargs=conf.getsection("elasticsearch_configs")):
- """
- :param base_log_folder: base folder to store logs locally
- :param log_id_template: log id template
- :param host: Elasticsearch host name
- """
- es_kwargs = es_kwargs or {}
- super().__init__(
- base_log_folder, filename_template)
- self.closed = False
-
- self.log_id_template, self.log_id_jinja_template = \
- parse_template_string(log_id_template)
-
- self.client = elasticsearch.Elasticsearch([host], **es_kwargs)
-
- self.frontend = frontend
- self.mark_end_on_close = True
- self.end_of_log_mark = end_of_log_mark
- self.write_stdout = write_stdout
- self.json_format = json_format
- self.json_fields = [label.strip() for label in json_fields.split(",")]
- self.handler = None
- self.context_set = False
-
- def _render_log_id(self, ti, try_number):
- if self.log_id_jinja_template:
- jinja_context = ti.get_template_context()
- jinja_context['try_number'] = try_number
- return self.log_id_jinja_template.render(**jinja_context)
-
- if self.json_format:
- execution_date = self._clean_execution_date(ti.execution_date)
- else:
- execution_date = ti.execution_date.isoformat()
- return self.log_id_template.format(dag_id=ti.dag_id,
- task_id=ti.task_id,
- execution_date=execution_date,
- try_number=try_number)
-
- @staticmethod
- def _clean_execution_date(execution_date):
- """
- Clean up an execution date so that it is safe to query in elasticsearch
- by removing reserved characters.
- #
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
-
- :param execution_date: execution date of the dag run.
- """
- return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")
-
- def _read(self, ti, try_number, metadata=None):
- """
- Endpoint for streaming log.
-
- :param ti: task instance object
- :param try_number: try_number of the task instance
- :param metadata: log metadata,
- can be used for steaming log reading and auto-tailing.
- :return: a list of log documents and metadata.
- """
- if not metadata:
- metadata = {'offset': 0}
- if 'offset' not in metadata:
- metadata['offset'] = 0
-
- offset = metadata['offset']
- log_id = self._render_log_id(ti, try_number)
-
- logs = self.es_read(log_id, offset, metadata)
-
- next_offset = offset if not logs else logs[-1].offset
-
- # Ensure a string here. Large offset numbers will get JSON.parsed
incorrectly
- # on the client. Sending as a string prevents this issue.
- #
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
- metadata['offset'] = str(next_offset)
-
- # end_of_log_mark may contain characters like '\n' which is needed to
- # have the log uploaded but will not be stored in elasticsearch.
- metadata['end_of_log'] = False if not logs \
- else logs[-1].message == self.end_of_log_mark.strip()
-
- cur_ts = pendulum.now()
- # Assume end of log after not receiving new log for 5 min,
- # as executor heartbeat is 1 min and there might be some
- # delay before Elasticsearch makes the log available.
- if 'last_log_timestamp' in metadata:
- last_log_ts = timezone.parse(metadata['last_log_timestamp'])
- if cur_ts.diff(last_log_ts).in_minutes() >= 5 or 'max_offset' in
metadata \
- and offset >= metadata['max_offset']:
- metadata['end_of_log'] = True
-
- if offset != next_offset or 'last_log_timestamp' not in metadata:
- metadata['last_log_timestamp'] = str(cur_ts)
-
- # If we hit the end of the log, remove the actual end_of_log message
- # to prevent it from showing in the UI.
- i = len(logs) if not metadata['end_of_log'] else len(logs) - 1
- message = '\n'.join([log.message for log in logs[0:i]])
-
- return message, metadata
-
- def es_read(self, log_id, offset, metadata):
- """
- Returns the logs matching log_id in Elasticsearch and next offset.
- Returns '' if no log is found or there was an error.
-
- :param log_id: the log_id of the log to read.
- :type log_id: str
- :param offset: the offset start to read log from.
- :type offset: str
- :param metadata: log metadata, used for steaming log download.
- :type metadata: dict
- """
-
- # Offset is the unique key for sorting logs given log_id.
- search = Search(using=self.client) \
- .query('match_phrase', log_id=log_id) \
- .sort('offset')
-
- search = search.filter('range', offset={'gt': int(offset)})
- max_log_line = search.count()
- if 'download_logs' in metadata and metadata['download_logs'] and
'max_offset' not in metadata:
- try:
- if max_log_line > 0:
- metadata['max_offset'] = search[max_log_line -
1].execute()[-1].offset
- else:
- metadata['max_offset'] = 0
- except Exception: # pylint: disable=broad-except
- self.log.exception('Could not get current log size with
log_id: %s', log_id)
-
- logs = []
- if max_log_line != 0:
- try:
-
- logs = search[self.MAX_LINE_PER_PAGE *
self.PAGE:self.MAX_LINE_PER_PAGE] \
- .execute()
- except Exception as e: # pylint: disable=broad-except
- self.log.exception('Could not read log with log_id: %s, error:
%s', log_id, str(e))
-
- return logs
-
- def set_context(self, ti):
- """
- Provide task_instance context to airflow task handler.
-
- :param ti: task instance object
- """
- self.mark_end_on_close = not ti.raw
-
- if self.json_format:
- self.formatter = JSONFormatter(
- json_fields=self.json_fields,
- extras={
- 'dag_id': str(ti.dag_id),
- 'task_id': str(ti.task_id),
- 'execution_date':
self._clean_execution_date(ti.execution_date),
- 'try_number': str(ti.try_number)
- })
-
- if self.write_stdout:
- if self.context_set:
- # We don't want to re-set up the handler if this logger has
- # already been initialized
- return
-
- self.handler = logging.StreamHandler(stream=sys.__stdout__)
- self.handler.setLevel(self.level)
- self.handler.setFormatter(self.formatter)
- else:
- super().set_context(ti)
- self.context_set = True
-
- def close(self):
- # When application exit, system shuts down all handlers by
- # calling close method. Here we check if logger is already
- # closed to prevent uploading the log to remote storage multiple
- # times when `logging.shutdown` is called.
- if self.closed:
- return
-
- if not self.mark_end_on_close:
- self.closed = True
- return
-
- # Case which context of the handler was not set.
- if self.handler is None:
- self.closed = True
- return
-
- # Reopen the file stream, because FileHandler.close() would be called
- # first in logging.shutdown() and the stream in it would be set to
None.
- if self.handler.stream is None or self.handler.stream.closed:
- self.handler.stream = self.handler._open() # pylint:
disable=protected-access
-
- # Mark the end of file using end of log mark,
- # so we know where to stop while auto-tailing.
- self.handler.stream.write(self.end_of_log_mark)
-
- if self.write_stdout:
- self.handler.close()
- sys.stdout = sys.__stdout__
-
- super().close()
-
- self.closed = True
-
- @property
- def log_name(self):
- return self.LOG_NAME
-
- def get_external_log_url(self, task_instance: TaskInstance, try_number:
int) -> str:
- """
- Creates an address for an external log collecting service.
-
- :param task_instance: task instance object
- :type: task_instance: TaskInstance
- :param try_number: task instance try_number to read logs from.
- :type try_number: Optional[int]
- :return: URL to the external log collection service
- :rtype: str
- """
- log_id = self.log_id_template.format(
- dag_id=task_instance.dag_id,
- task_id=task_instance.task_id,
- execution_date=task_instance.execution_date,
- try_number=try_number)
- url = 'https://' + self.frontend.format(log_id=quote(log_id))
- return url
+"""
+This module is deprecated. Please use
`airflow.providers.elasticsearch.log.es_task_handler`.
+"""
+import warnings
+
+# pylint: disable=unused-import
+from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler # noqa
+
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.providers.elasticsearch.log.es_task_handler`.",
+ DeprecationWarning, stacklevel=2
+)
diff --git a/airflow/utils/log/stackdriver_task_handler.py
b/airflow/utils/log/stackdriver_task_handler.py
index 4865fb3..903a64f 100644
--- a/airflow/utils/log/stackdriver_task_handler.py
+++ b/airflow/utils/log/stackdriver_task_handler.py
@@ -15,281 +15,14 @@
# specific language governing permissions and limitations
# under the License.
"""
-Handler that integrates with Stackdriver
+This module is deprecated. Please use
`airflow.providers.google.cloud.log.stackdriver_task_handler`.
"""
-import logging
-from typing import Collection, Dict, List, Optional, Tuple, Type
+import warnings
-from cached_property import cached_property
-from google.api_core.gapic_v1.client_info import ClientInfo
-from google.cloud import logging as gcp_logging
-from google.cloud.logging.handlers.transports import
BackgroundThreadTransport, Transport
-from google.cloud.logging.resource import Resource
+# pylint: disable=unused-import
+from airflow.providers.google.cloud.log.stackdriver_task_handler import
StackdriverTaskHandler # noqa
-from airflow import version
-from airflow.models import TaskInstance
-from airflow.providers.google.cloud.utils.credentials_provider import
get_credentials_and_project_id
-
-DEFAULT_LOGGER_NAME = "airflow"
-_GLOBAL_RESOURCE = Resource(type="global", labels={})
-
-_DEFAULT_SCOPESS = frozenset([
- "https://www.googleapis.com/auth/logging.read",
- "https://www.googleapis.com/auth/logging.write"
-])
-
-
-class StackdriverTaskHandler(logging.Handler):
- """Handler that directly makes Stackdriver logging API calls.
-
- This is a Python standard ``logging`` handler using that can be used to
- route Python standard logging messages directly to the Stackdriver
- Logging API.
-
- It can also be used to save logs for executing tasks. To do this, you
should set as a handler with
- the name "tasks". In this case, it will also be used to read the log for
display in Web UI.
-
- This handler supports both an asynchronous and synchronous transport.
-
-
- :param gcp_key_path: Path to GCP Credential JSON file.
- If ommited, authorization based on `the Application Default Credentials
-
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
will
- be used.
- :type gcp_key_path: str
- :param scopes: OAuth scopes for the credentials,
- :type scopes: Sequence[str]
- :param name: the name of the custom log in Stackdriver Logging. Defaults
- to 'airflow'. The name of the Python logger will be represented
- in the ``python_logger`` field.
- :type name: str
- :param transport: Class for creating new transport objects. It should
- extend from the base :class:`google.cloud.logging.handlers.Transport`
type and
- implement :meth`google.cloud.logging.handlers.Transport.send`.
Defaults to
- :class:`google.cloud.logging.handlers.BackgroundThreadTransport`. The
other
- option is :class:`google.cloud.logging.handlers.SyncTransport`.
- :type transport: :class:`type`
- :param resource: (Optional) Monitored resource of the entry, defaults
- to the global resource type.
- :type resource: :class:`~google.cloud.logging.resource.Resource`
- :param labels: (Optional) Mapping of labels for the entry.
- :type labels: dict
- """
-
- LABEL_TASK_ID = "task_id"
- LABEL_DAG_ID = "dag_id"
- LABEL_EXECUTION_DATE = "execution_date"
- LABEL_TRY_NUMBER = "try_number"
-
- def __init__(
- self,
- gcp_key_path: Optional[str] = None,
- # See: https://github.com/PyCQA/pylint/issues/2377
- scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, # pylint:
disable=unsubscriptable-object
- name: str = DEFAULT_LOGGER_NAME,
- transport: Type[Transport] = BackgroundThreadTransport,
- resource: Resource = _GLOBAL_RESOURCE,
- labels: Optional[Dict[str, str]] = None,
- ):
- super().__init__()
- self.gcp_key_path: Optional[str] = gcp_key_path
- # See: https://github.com/PyCQA/pylint/issues/2377
- self.scopes: Optional[Collection[str]] = scopes # pylint:
disable=unsubscriptable-object
- self.name: str = name
- self.transport_type: Type[Transport] = transport
- self.resource: Resource = resource
- self.labels: Optional[Dict[str, str]] = labels
- self.task_instance_labels: Optional[Dict[str, str]] = {}
-
- @cached_property
- def _client(self) -> gcp_logging.Client:
- """Google Cloud Library API client"""
- credentials, _ = get_credentials_and_project_id(
- key_path=self.gcp_key_path,
- scopes=self.scopes,
- )
- client = gcp_logging.Client(
- credentials=credentials,
- client_info=ClientInfo(client_library_version='airflow_v' +
version.version)
- )
- return client
-
- @cached_property
- def _transport(self) -> Transport:
- """Object responsible for sending data to Stackdriver"""
- return self.transport_type(self._client, self.name)
-
- def emit(self, record: logging.LogRecord) -> None:
- """Actually log the specified logging record.
-
- :param record: The record to be logged.
- :type record: logging.LogRecord
- """
- message = self.format(record)
- labels: Optional[Dict[str, str]]
- if self.labels and self.task_instance_labels:
- labels = {}
- labels.update(self.labels)
- labels.update(self.task_instance_labels)
- elif self.labels:
- labels = self.labels
- elif self.task_instance_labels:
- labels = self.task_instance_labels
- else:
- labels = None
- self._transport.send(record, message, resource=self.resource,
labels=labels)
-
- def set_context(self, task_instance: TaskInstance) -> None:
- """
- Configures the logger to add information with information about the
current task
-
- :param task_instance: Currently executed task
- :type task_instance: TaskInstance
- """
- self.task_instance_labels =
self._task_instance_to_labels(task_instance)
-
- def read(
- self, task_instance: TaskInstance, try_number: Optional[int] = None,
metadata: Optional[Dict] = None
- ) -> Tuple[List[str], List[Dict]]:
- """
- Read logs of given task instance from Stackdriver logging.
-
- :param task_instance: task instance object
- :type: task_instance: TaskInstance
- :param try_number: task instance try_number to read logs from. If None
- it returns all logs
- :type try_number: Optional[int]
- :param metadata: log metadata. It is used for steaming log reading and
auto-tailing.
- :type metadata: Dict
- :return: a tuple of list of logs and list of metadata
- :rtype: Tuple[List[str], List[Dict]]
- """
- if try_number is not None and try_number < 1:
- logs = ["Error fetching the logs. Try number {} is
invalid.".format(try_number)]
- return logs, [{"end_of_log": "true"}]
-
- if not metadata:
- metadata = {}
-
- ti_labels = self._task_instance_to_labels(task_instance)
-
- if try_number is not None:
- ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
- else:
- del ti_labels[self.LABEL_TRY_NUMBER]
-
- log_filter = self._prepare_log_filter(ti_labels)
- next_page_token = metadata.get("next_page_token", None)
- all_pages = 'download_logs' in metadata and metadata['download_logs']
-
- messages, end_of_log, next_page_token = self._read_logs(log_filter,
next_page_token, all_pages)
-
- new_metadata = {"end_of_log": end_of_log}
-
- if next_page_token:
- new_metadata['next_page_token'] = next_page_token
-
- return [messages], [new_metadata]
-
- def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str:
- """
- Prepares the filter that chooses which log entries to fetch.
-
- More information:
-
https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list#body.request_body.FIELDS.filter
- https://cloud.google.com/logging/docs/view/advanced-queries
-
- :param ti_labels: Task Instance's labels that will be used to search
for logs
- :type: Dict[str, str]
- :return: logs filter
- """
- def escape_label_key(key: str) -> str:
- return f'"{key}"' if "." in key else key
-
- def escale_label_value(value: str) -> str:
- escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
- return f'"{escaped_value}"'
-
- log_filters = [
- f'resource.type={escale_label_value(self.resource.type)}',
- f'logName="projects/{self._client.project}/logs/{self.name}"'
- ]
-
- for key, value in self.resource.labels.items():
-
log_filters.append(f'resource.labels.{escape_label_key(key)}={escale_label_value(value)}')
-
- for key, value in ti_labels.items():
-
log_filters.append(f'labels.{escape_label_key(key)}={escale_label_value(value)}')
- return "\n".join(log_filters)
-
- def _read_logs(
- self,
- log_filter: str,
- next_page_token: Optional[str],
- all_pages: bool
- ) -> Tuple[str, bool, Optional[str]]:
- """
- Sends requests to the Stackdriver service and downloads logs.
-
- :param log_filter: Filter specifying the logs to be downloaded.
- :type log_filter: str
- :param next_page_token: The token of the page from which the log
download will start.
- If None is passed, it will start from the first page.
- :param all_pages: If True is passed, all subpages will be downloaded.
Otherwise, only the first
- page will be downloaded
- :return: A token that contains the following items:
- * string with logs
- * Boolean value describing whether there are more logs,
- * token of the next page
- :rtype: Tuple[str, bool, str]
- """
- messages = []
- new_messages, next_page_token = self._read_single_logs_page(
- log_filter=log_filter,
- page_token=next_page_token,
- )
- messages.append(new_messages)
- if all_pages:
- while next_page_token:
- new_messages, next_page_token = self._read_single_logs_page(
- log_filter=log_filter,
- page_token=next_page_token
- )
- messages.append(new_messages)
-
- end_of_log = True
- next_page_token = None
- else:
- end_of_log = not bool(next_page_token)
- return "\n".join(messages), end_of_log, next_page_token
-
- def _read_single_logs_page(self, log_filter: str, page_token:
Optional[str] = None) -> Tuple[str, str]:
- """
- Sends requests to the Stackdriver service and downloads single pages
with logs.
-
- :param log_filter: Filter specifying the logs to be downloaded.
- :type log_filter: str
- :param page_token: The token of the page to be downloaded. If None is
passed, the first page will be
- downloaded.
- :type page_token: str
- :return: Downloaded logs and next page token
- :rtype: Tuple[str, str]
- """
- entries = self._client.list_entries(filter_=log_filter,
page_token=page_token)
- page = next(entries.pages)
- next_page_token = entries.next_page_token
- messages = []
- for entry in page:
- if "message" in entry.payload:
- messages.append(entry.payload["message"])
-
- return "\n".join(messages), next_page_token
-
- @classmethod
- def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
- return {
- cls.LABEL_TASK_ID: ti.task_id,
- cls.LABEL_DAG_ID: ti.dag_id,
- cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()),
- cls.LABEL_TRY_NUMBER: str(ti.try_number),
- }
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.providers.google.cloud.log.stackdriver_task_handler`.",
+ DeprecationWarning, stacklevel=2
+)
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 1166643..0842716 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -421,3 +421,5 @@ All task log handlers are derived from
:class:`~airflow.utils.log.file_task_hand
airflow/providers/amazon/aws/log/index
+ airflow/providers/elasticsearch/log/index
+ airflow/providers/google/cloud/log/index
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 03303cb..4c23108 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1753,7 +1753,26 @@ UTILS = [
)
]
-ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS + UTILS
+LOGS = [
+ (
+ "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
+ "airflow.utils.log.s3_task_handler.S3TaskHandler"
+ ),
+ (
+
'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+ 'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler'
+ ),
+ (
+
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+ 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler'
+ ),
+ (
+
"airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
+ "airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler"
+ )
+]
+
+ALL = HOOKS + OPERATORS + SECRETS + SENSORS + TRANSFERS + UTILS + LOGS
RENAMED_ALL = [
(old_class, new_class)
diff --git a/tests/utils/log/test_cloudwatch_task_handler.py
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
similarity index 100%
rename from tests/utils/log/test_cloudwatch_task_handler.py
rename to tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
diff --git a/tests/providers/elasticsearch/log/__init__.py
b/tests/providers/elasticsearch/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/elasticsearch/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/utils/log/elasticmock/__init__.py
b/tests/providers/elasticsearch/log/elasticmock/__init__.py
similarity index 100%
rename from tests/utils/log/elasticmock/__init__.py
rename to tests/providers/elasticsearch/log/elasticmock/__init__.py
diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
similarity index 100%
rename from tests/utils/log/elasticmock/fake_elasticsearch.py
rename to tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
diff --git a/tests/utils/log/elasticmock/utilities/__init__.py
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
similarity index 100%
rename from tests/utils/log/elasticmock/utilities/__init__.py
rename to tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
diff --git a/tests/utils/log/test_es_task_handler.py
b/tests/providers/elasticsearch/log/test_es_task_handler.py
similarity index 99%
rename from tests/utils/log/test_es_task_handler.py
rename to tests/providers/elasticsearch/log/test_es_task_handler.py
index 8453ca8..bb82886 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -30,8 +30,8 @@ from parameterized import parameterized
from airflow.configuration import conf
from airflow.models import DAG, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
+from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
from airflow.utils import timezone
-from airflow.utils.log.es_task_handler import ElasticsearchTaskHandler
from airflow.utils.state import State
from airflow.utils.timezone import datetime
diff --git a/tests/providers/google/cloud/log/__init__.py
b/tests/providers/google/cloud/log/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/google/cloud/log/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/utils/log/test_stackdriver_task_handler.py
b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
similarity index 85%
rename from tests/utils/log/test_stackdriver_task_handler.py
rename to tests/providers/google/cloud/log/test_stackdriver_task_handler.py
index 9af7d2d..684a0a8 100644
--- a/tests/utils/log/test_stackdriver_task_handler.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
@@ -25,7 +25,7 @@ from google.cloud.logging.resource import Resource
from airflow.models import TaskInstance
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
-from airflow.utils.log.stackdriver_task_handler import StackdriverTaskHandler
+from airflow.providers.google.cloud.log.stackdriver_task_handler import
StackdriverTaskHandler
from airflow.utils.state import State
@@ -36,8 +36,8 @@ def _create_list_response(messages, token):
class TestStackdriverLoggingHandlerStandalone(unittest.TestCase):
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_pass_message_to_client(self, mock_client,
mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
@@ -78,8 +78,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.ti.state = State.RUNNING
self.addCleanup(self.dag.clear)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_set_labels(self, mock_client,
mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
@@ -100,8 +100,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
mock.ANY, 'test-message', labels=labels, resource=resource
)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_append_labels(self, mock_client,
mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
self.stackdriver_task_handler = StackdriverTaskHandler(
@@ -126,9 +126,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
mock.ANY, 'test-message', labels=labels, resource=resource
)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch(
- 'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
**{'return_value.project': 'asf-project'} # type: ignore
)
def test_should_read_logs_for_all_try(self, mock_client,
mock_get_creds_and_project_id):
@@ -147,9 +147,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['MSG1\nMSG2'], logs)
self.assertEqual([{'end_of_log': True}], metadata)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch( # type: ignore
- 'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
**{'return_value.project': 'asf-project'} # type: ignore
)
def test_should_read_logs_for_task_with_quote(self, mock_client,
mock_get_creds_and_project_id):
@@ -168,9 +168,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['MSG1\nMSG2'], logs)
self.assertEqual([{'end_of_log': True}], metadata)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch(
- 'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
**{'return_value.project': 'asf-project'} # type: ignore
)
def test_should_read_logs_for_single_try(self, mock_client,
mock_get_creds_and_project_id):
@@ -190,8 +190,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['MSG1\nMSG2'], logs)
self.assertEqual([{'end_of_log': True}], metadata)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_read_logs_with_pagination(self, mock_client,
mock_get_creds_and_project_id):
mock_client.return_value.list_entries.side_effect = [
_create_list_response(["MSG1", "MSG2"], "TOKEN1"),
@@ -213,8 +213,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['MSG3\nMSG4'], logs)
self.assertEqual([{'end_of_log': True}], metadata2)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_read_logs_with_download(self, mock_client,
mock_get_creds_and_project_id):
mock_client.return_value.list_entries.side_effect = [
_create_list_response(["MSG1", "MSG2"], "TOKEN1"),
@@ -227,9 +227,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['MSG1\nMSG2\nMSG3\nMSG4'], logs)
self.assertEqual([{'end_of_log': True}], metadata1)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch(
- 'airflow.utils.log.stackdriver_task_handler.gcp_logging.Client',
+
'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
**{'return_value.project': 'asf-project'} # type: ignore
)
def test_should_read_logs_with_custom_resources(self, mock_client,
mock_get_creds_and_project_id):
@@ -267,8 +267,8 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.assertEqual(['TEXT\nTEXT'], logs)
self.assertEqual([{'end_of_log': True}], metadata)
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.get_credentials_and_project_id')
-
@mock.patch('airflow.utils.log.stackdriver_task_handler.gcp_logging.Client')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
+
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_use_credentials(self, mock_client,
mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')