This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit be04073226a99a7eb57f2eeac929263493105815 Author: Kamil BreguĊa <[email protected]> AuthorDate: Wed Feb 3 04:16:50 2021 +0100 Support google-cloud-logging` >=2.0.0 (#13801) (cherry picked from commit 0e8c77b93a5ca5ecfdcd1c4bd91f54846fc15d57) --- airflow/providers/google/ADDITIONAL_INFO.md | 1 + .../google/cloud/log/stackdriver_task_handler.py | 72 +++++-- setup.py | 2 +- .../cloud/log/test_stackdriver_task_handler.py | 225 +++++++++++++-------- 4 files changed, 200 insertions(+), 100 deletions(-) diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md index 9cf9853..a363051 100644 --- a/airflow/providers/google/ADDITIONAL_INFO.md +++ b/airflow/providers/google/ADDITIONAL_INFO.md @@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som | [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) | | [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) | | [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) | +| [``google-cloud-logging``](https://pypi.org/project/google-cloud-logging/) | ``>=1.14.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-logging/blob/master/UPGRADING.md) | | [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) | | [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) | | [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) | diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/airflow/providers/google/cloud/log/stackdriver_task_handler.py index be75fcd..5479185 100644 --- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -21,9 +21,12 @@ from urllib.parse import urlencode from cached_property import cached_property from google.api_core.gapic_v1.client_info import ClientInfo +from google.auth.credentials import Credentials from google.cloud import logging as gcp_logging +from google.cloud.logging import Resource from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport -from google.cloud.logging.resource import Resource +from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client +from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse from airflow import version from airflow.models import TaskInstance @@ -99,13 +102,19 @@ class StackdriverTaskHandler(logging.Handler): self.resource: Resource = resource self.labels: Optional[Dict[str, str]] = labels self.task_instance_labels: Optional[Dict[str, str]] = {} + self.task_instance_hostname = 'default-hostname' @cached_property - def _client(self) -> gcp_logging.Client: - """Google Cloud Library API client""" + def _credentials_and_project(self) -> Tuple[Credentials, str]: credentials, project = get_credentials_and_project_id( key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True ) + return credentials, project + + @property + def _client(self) -> gcp_logging.Client: + """The Cloud Library API client""" + credentials, project = self._credentials_and_project client = gcp_logging.Client( credentials=credentials, project=project, @@ -113,6 +122,16 @@ class StackdriverTaskHandler(logging.Handler): ) return client + @property + def _logging_service_client(self) -> LoggingServiceV2Client: + """The Cloud logging service v2 client.""" + credentials, _ = self._credentials_and_project + client = LoggingServiceV2Client( + credentials=credentials, + client_info=ClientInfo(client_library_version='airflow_v' + version.version), + ) + return client + @cached_property def _transport(self) -> Transport: """Object responsible for sending data to Stackdriver""" @@ -146,10 +165,11 @@ class StackdriverTaskHandler(logging.Handler): :type task_instance: :class:`airflow.models.TaskInstance` """ self.task_instance_labels = self._task_instance_to_labels(task_instance) + self.task_instance_hostname = task_instance.hostname def read( self, task_instance: TaskInstance, try_number: Optional[int] = None, metadata: Optional[Dict] = None - ) -> Tuple[List[str], List[Dict]]: + ) -> Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]: """ Read logs of given task instance from Stackdriver logging. @@ -160,12 +180,14 @@ class StackdriverTaskHandler(logging.Handler): :type try_number: Optional[int] :param metadata: log metadata. It is used for steaming log reading and auto-tailing. :type metadata: Dict - :return: a tuple of list of logs and list of metadata - :rtype: Tuple[List[str], List[Dict]] + :return: a tuple of ( + list of (one element tuple with two element tuple - hostname and logs) + and list of metadata) + :rtype: Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]] """ if try_number is not None and try_number < 1: - logs = [f"Error fetching the logs. Try number {try_number} is invalid."] - return logs, [{"end_of_log": "true"}] + logs = f"Error fetching the logs. Try number {try_number} is invalid." + return [((self.task_instance_hostname, logs),)], [{"end_of_log": "true"}] if not metadata: metadata = {} @@ -188,7 +210,7 @@ class StackdriverTaskHandler(logging.Handler): if next_page_token: new_metadata['next_page_token'] = next_page_token - return [messages], [new_metadata] + return [((self.task_instance_hostname, messages),)], [new_metadata] def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str: """ @@ -210,9 +232,10 @@ class StackdriverTaskHandler(logging.Handler): escaped_value = value.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped_value}"' + _, project = self._credentials_and_project log_filters = [ f'resource.type={escale_label_value(self.resource.type)}', - f'logName="projects/{self._client.project}/logs/{self.name}"', + f'logName="projects/{project}/logs/{self.name}"', ] for key, value in self.resource.labels.items(): @@ -252,6 +275,8 @@ class StackdriverTaskHandler(logging.Handler): log_filter=log_filter, page_token=next_page_token ) messages.append(new_messages) + if not messages: + break end_of_log = True next_page_token = None @@ -271,15 +296,21 @@ class StackdriverTaskHandler(logging.Handler): :return: Downloaded logs and next page token :rtype: Tuple[str, str] """ - entries = self._client.list_entries(filter_=log_filter, page_token=page_token) - page = next(entries.pages) - next_page_token = entries.next_page_token + _, project = self._credentials_and_project + request = ListLogEntriesRequest( + resource_names=[f'projects/{project}'], + filter=log_filter, + page_token=page_token, + order_by='timestamp asc', + page_size=1000, + ) + response = self._logging_service_client.list_log_entries(request=request) + page: ListLogEntriesResponse = next(response.pages) messages = [] - for entry in page: - if "message" in entry.payload: - messages.append(entry.payload["message"]) - - return "\n".join(messages), next_page_token + for entry in page.entries: + if "message" in entry.json_payload: + messages.append(entry.json_payload["message"]) + return "\n".join(messages), page.next_page_token @classmethod def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]: @@ -315,7 +346,7 @@ class StackdriverTaskHandler(logging.Handler): :return: URL to the external log collection service :rtype: str """ - project_id = self._client.project + _, project_id = self._credentials_and_project ti_labels = self._task_instance_to_labels(task_instance) ti_labels[self.LABEL_TRY_NUMBER] = str(try_number) @@ -331,3 +362,6 @@ class StackdriverTaskHandler(logging.Handler): url = f"{self.LOG_VIEWER_BASE_URL}?{urlencode(url_query_string)}" return url + + def close(self) -> None: + self._transport.flush() diff --git a/setup.py b/setup.py index fa1e73a..7beb684 100644 --- a/setup.py +++ b/setup.py @@ -292,7 +292,7 @@ google = [ 'google-cloud-dlp>=0.11.0,<2.0.0', 'google-cloud-kms>=2.0.0,<3.0.0', 'google-cloud-language>=1.1.1,<2.0.0', - 'google-cloud-logging>=1.14.0,<2.0.0', + 'google-cloud-logging>=2.1.1,<3.0.0', 'google-cloud-memcache>=0.2.0', 'google-cloud-monitoring>=2.0.0,<3.0.0', 'google-cloud-os-login>=2.0.0,<3.0.0', diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py index 4159e9e..b4dbf69 100644 --- a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py +++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py @@ -21,7 +21,8 @@ from datetime import datetime from unittest import mock from urllib.parse import parse_qs, urlparse -from google.cloud.logging.resource import Resource +from google.cloud.logging import Resource +from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse, LogEntry from airflow.models import TaskInstance from airflow.models.dag import DAG @@ -30,15 +31,27 @@ from airflow.providers.google.cloud.log.stackdriver_task_handler import Stackdri from airflow.utils.state import State -def _create_list_response(messages, token): - page = [mock.MagicMock(payload={"message": message}) for message in messages] - return mock.MagicMock(pages=(n for n in [page]), next_page_token=token) +def _create_list_log_entries_response_mock(messages, token): + return ListLogEntriesResponse( + entries=[LogEntry(json_payload={"message": message}) for message in messages], next_page_token=token + ) + + +def _remove_stackdriver_handlers(): + for handler_ref in reversed(logging._handlerList[:]): + handler = handler_ref() + if not isinstance(handler, StackdriverTaskHandler): + continue + logging._removeHandlerRef(handler_ref) + del handler class TestStackdriverLoggingHandlerStandalone(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client') def test_should_pass_message_to_client(self, mock_client, mock_get_creds_and_project_id): + self.addCleanup(_remove_stackdriver_handlers) + mock_get_creds_and_project_id.return_value = ('creds', 'project_id') transport_type = mock.MagicMock() @@ -69,6 +82,7 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase): self.ti.try_number = 1 self.ti.state = State.RUNNING self.addCleanup(self.dag.clear) + self.addCleanup(_remove_stackdriver_handlers) @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client') @@ -118,107 +132,153 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase): ) @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch( - 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client', - **{'return_value.project': 'asf-project'}, # type: ignore - ) + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_project_id): - mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None) + mock_client.return_value.list_log_entries.return_value.pages = iter( + [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)] + ) mock_get_creds_and_project_id.return_value = ('creds', 'project_id') logs, metadata = self.stackdriver_task_handler.read(self.ti) - mock_client.return_value.list_entries.assert_called_once_with( - filter_='resource.type="global"\n' - 'logName="projects/asf-project/logs/airflow"\n' - 'labels.task_id="task_for_testing_file_log_handler"\n' - 'labels.dag_id="dag_for_testing_file_task_handler"\n' - 'labels.execution_date="2016-01-01T00:00:00+00:00"', - page_token=None, + mock_client.return_value.list_log_entries.assert_called_once_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_file_log_handler"\n' + 'labels.dag_id="dag_for_testing_file_task_handler"\n' + 'labels.execution_date="2016-01-01T00:00:00+00:00"' + ), + order_by='timestamp asc', + page_size=1000, + page_token=None, + ) ) - assert ['MSG1\nMSG2'] == logs + assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs assert [{'end_of_log': True}] == metadata @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch( - 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client', - **{'return_value.project': 'asf-project'}, # type: ignore - ) + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_and_project_id): - mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None) + mock_client.return_value.list_log_entries.return_value.pages = iter( + [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)] + ) mock_get_creds_and_project_id.return_value = ('creds', 'project_id') self.ti.task_id = "K\"OT" logs, metadata = self.stackdriver_task_handler.read(self.ti) - mock_client.return_value.list_entries.assert_called_once_with( - filter_='resource.type="global"\n' - 'logName="projects/asf-project/logs/airflow"\n' - 'labels.task_id="K\\"OT"\n' - 'labels.dag_id="dag_for_testing_file_task_handler"\n' - 'labels.execution_date="2016-01-01T00:00:00+00:00"', - page_token=None, + mock_client.return_value.list_log_entries.assert_called_once_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="K\\"OT"\n' + 'labels.dag_id="dag_for_testing_file_task_handler"\n' + 'labels.execution_date="2016-01-01T00:00:00+00:00"' + ), + order_by='timestamp asc', + page_size=1000, + page_token=None, + ) ) - assert ['MSG1\nMSG2'] == logs + assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs assert [{'end_of_log': True}] == metadata @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch( - 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client', - **{'return_value.project': 'asf-project'}, # type: ignore - ) + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_project_id): - mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None) + mock_client.return_value.list_log_entries.return_value.pages = iter( + [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)] + ) mock_get_creds_and_project_id.return_value = ('creds', 'project_id') logs, metadata = self.stackdriver_task_handler.read(self.ti, 3) - mock_client.return_value.list_entries.assert_called_once_with( - filter_='resource.type="global"\n' - 'logName="projects/asf-project/logs/airflow"\n' - 'labels.task_id="task_for_testing_file_log_handler"\n' - 'labels.dag_id="dag_for_testing_file_task_handler"\n' - 'labels.execution_date="2016-01-01T00:00:00+00:00"\n' - 'labels.try_number="3"', - page_token=None, + mock_client.return_value.list_log_entries.assert_called_once_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_file_log_handler"\n' + 'labels.dag_id="dag_for_testing_file_task_handler"\n' + 'labels.execution_date="2016-01-01T00:00:00+00:00"\n' + 'labels.try_number="3"' + ), + order_by='timestamp asc', + page_size=1000, + page_token=None, + ) ) - assert ['MSG1\nMSG2'] == logs + assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs assert [{'end_of_log': True}] == metadata @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client') + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_project_id): - mock_client.return_value.list_entries.side_effect = [ - _create_list_response(["MSG1", "MSG2"], "TOKEN1"), - _create_list_response(["MSG3", "MSG4"], None), + mock_client.return_value.list_log_entries.side_effect = [ + mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])), + mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])), ] mock_get_creds_and_project_id.return_value = ('creds', 'project_id') logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3) - mock_client.return_value.list_entries.assert_called_once_with(filter_=mock.ANY, page_token=None) - assert ['MSG1\nMSG2'] == logs + mock_client.return_value.list_log_entries.assert_called_once_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + '''resource.type="global" +logName="projects/project_id/logs/airflow" +labels.task_id="task_for_testing_file_log_handler" +labels.dag_id="dag_for_testing_file_task_handler" +labels.execution_date="2016-01-01T00:00:00+00:00" +labels.try_number="3"''' + ), + order_by='timestamp asc', + page_size=1000, + page_token=None, + ) + ) + assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs assert [{'end_of_log': False, 'next_page_token': 'TOKEN1'}] == metadata1 - mock_client.return_value.list_entries.return_value.next_page_token = None + mock_client.return_value.list_log_entries.return_value.next_page_token = None logs, metadata2 = self.stackdriver_task_handler.read(self.ti, 3, metadata1[0]) - mock_client.return_value.list_entries.assert_called_with(filter_=mock.ANY, page_token="TOKEN1") - assert ['MSG3\nMSG4'] == logs + + mock_client.return_value.list_log_entries.assert_called_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_file_log_handler"\n' + 'labels.dag_id="dag_for_testing_file_task_handler"\n' + 'labels.execution_date="2016-01-01T00:00:00+00:00"\n' + 'labels.try_number="3"' + ), + order_by='timestamp asc', + page_size=1000, + page_token="TOKEN1", + ) + ) + assert [(('default-hostname', 'MSG3\nMSG4'),)] == logs assert [{'end_of_log': True}] == metadata2 @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client') + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_with_download(self, mock_client, mock_get_creds_and_project_id): - mock_client.return_value.list_entries.side_effect = [ - _create_list_response(["MSG1", "MSG2"], "TOKEN1"), - _create_list_response(["MSG3", "MSG4"], None), + mock_client.return_value.list_log_entries.side_effect = [ + mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])), + mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])), ] mock_get_creds_and_project_id.return_value = ('creds', 'project_id') logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3, {'download_logs': True}) - assert ['MSG1\nMSG2\nMSG3\nMSG4'] == logs + assert [(('default-hostname', 'MSG1\nMSG2\nMSG3\nMSG4'),)] == logs assert [{'end_of_log': True}] == metadata1 @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch( - 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client', - **{'return_value.project': 'asf-project'}, # type: ignore - ) + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_creds_and_project_id): mock_get_creds_and_project_id.return_value = ('creds', 'project_id') resource = Resource( @@ -226,31 +286,37 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase): labels={ "environment.name": 'test-instancce', "location": 'europpe-west-3', - "project_id": "asf-project", + "project_id": "project_id", }, ) self.stackdriver_task_handler = StackdriverTaskHandler( transport=self.transport_mock, resource=resource ) - entry = mock.MagicMock(payload={"message": "TEXT"}) - page = [entry, entry] - mock_client.return_value.list_entries.return_value.pages = (n for n in [page]) - mock_client.return_value.list_entries.return_value.next_page_token = None + entry = mock.MagicMock(json_payload={"message": "TEXT"}) + page = mock.MagicMock(entries=[entry, entry], next_page_token=None) + mock_client.return_value.list_log_entries.return_value.pages = (n for n in [page]) logs, metadata = self.stackdriver_task_handler.read(self.ti) - mock_client.return_value.list_entries.assert_called_once_with( - filter_='resource.type="cloud_composer_environment"\n' - 'logName="projects/asf-project/logs/airflow"\n' - 'resource.labels."environment.name"="test-instancce"\n' - 'resource.labels.location="europpe-west-3"\n' - 'resource.labels.project_id="asf-project"\n' - 'labels.task_id="task_for_testing_file_log_handler"\n' - 'labels.dag_id="dag_for_testing_file_task_handler"\n' - 'labels.execution_date="2016-01-01T00:00:00+00:00"', - page_token=None, + mock_client.return_value.list_log_entries.assert_called_once_with( + request=ListLogEntriesRequest( + resource_names=["projects/project_id"], + filter=( + 'resource.type="cloud_composer_environment"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'resource.labels."environment.name"="test-instancce"\n' + 'resource.labels.location="europpe-west-3"\n' + 'resource.labels.project_id="project_id"\n' + 'labels.task_id="task_for_testing_file_log_handler"\n' + 'labels.dag_id="dag_for_testing_file_task_handler"\n' + 'labels.execution_date="2016-01-01T00:00:00+00:00"' + ), + order_by='timestamp asc', + page_size=1000, + page_token=None, + ) ) - assert ['TEXT\nTEXT'] == logs + assert [(('default-hostname', 'TEXT\nTEXT'),)] == logs assert [{'end_of_log': True}] == metadata @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') @@ -278,10 +344,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase): assert mock_client.return_value == client @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id') - @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client') + @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client') def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_project_id): mock_get_creds_and_project_id.return_value = ('creds', 'project_id') - mock_client.return_value.project = 'project_id' stackdriver_task_handler = StackdriverTaskHandler( gcp_key_path="KEY_PATH",
