This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit acfe4ae7988476f79cd63f21d3fd8de4336daa42 Author: Kamil BreguĊa <[email protected]> AuthorDate: Tue Feb 2 07:01:55 2021 +0100 Support google-cloud-monitoring>=2.0.0 (#13769) (cherry picked from commit d2efb33239d36e58fb69066fd23779724cb11a90) --- airflow/providers/google/ADDITIONAL_INFO.md | 1 + .../cloud/example_dags/example_stackdriver.py | 82 +++++-- .../providers/google/cloud/hooks/stackdriver.py | 133 +++++------ .../google/cloud/operators/stackdriver.py | 12 +- setup.py | 2 +- .../google/cloud/hooks/test_stackdriver.py | 242 +++++++++++---------- .../google/cloud/operators/test_stackdriver.py | 49 ++++- 7 files changed, 302 insertions(+), 219 deletions(-) diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md index 16a6683..9cf9853 100644 --- a/airflow/providers/google/ADDITIONAL_INFO.md +++ b/airflow/providers/google/ADDITIONAL_INFO.md @@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som | [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) | | [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) | | [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) | +| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) | | [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) | | [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) | | [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) | diff --git a/airflow/providers/google/cloud/example_dags/example_stackdriver.py b/airflow/providers/google/cloud/example_dags/example_stackdriver.py index 68ac978..9c418b7 100644 --- a/airflow/providers/google/cloud/example_dags/example_stackdriver.py +++ b/airflow/providers/google/cloud/example_dags/example_stackdriver.py @@ -21,6 +21,7 @@ Example Airflow DAG for Google Cloud Stackdriver service. """ import json +import os from airflow import models from airflow.providers.google.cloud.operators.stackdriver import ( @@ -37,56 +38,80 @@ from airflow.providers.google.cloud.operators.stackdriver import ( ) from airflow.utils.dates import days_ago +PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + TEST_ALERT_POLICY_1 = { "combiner": "OR", - "name": "projects/sd-project/alertPolicies/12345", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": True, - "displayName": "test alert 1", + "display_name": "test alert 1", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { + "filter": ( + 'metric.label.state="blocked" AND ' + 'metric.type="agent.googleapis.com/processes/count_by_state" ' + 'AND resource.type="gce_instance"' + ), "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "threshold_value": 100, + "duration": {'seconds': 900}, + "trigger": {"percent": 0}, + "aggregations": [ + { + "alignment_period": {'seconds': 60}, + "per_series_aligner": "ALIGN_MEAN", + "cross_series_reducer": "REDUCE_MEAN", + "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"], + } + ], }, - "displayName": "Condition display", - "name": "projects/sd-project/alertPolicies/123/conditions/456", + "display_name": "test_alert_policy_1", } ], } TEST_ALERT_POLICY_2 = { "combiner": "OR", - "name": "projects/sd-project/alertPolicies/6789", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": False, - "displayName": "test alert 2", + "display_name": "test alert 2", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { + "filter": ( + 'metric.label.state="blocked" AND ' + 'metric.type="agent.googleapis.com/processes/count_by_state" AND ' + 'resource.type="gce_instance"' + ), "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "threshold_value": 100, + "duration": {'seconds': 900}, + "trigger": {"percent": 0}, + "aggregations": [ + { + "alignment_period": {'seconds': 60}, + "per_series_aligner": "ALIGN_MEAN", + "cross_series_reducer": "REDUCE_MEAN", + "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"], + } + ], }, - "displayName": "Condition display", - "name": "projects/sd-project/alertPolicies/456/conditions/789", + "display_name": "test_alert_policy_2", } ], } TEST_NOTIFICATION_CHANNEL_1 = { - "displayName": "channel1", + "display_name": "channel1", "enabled": True, "labels": {"auth_token": "top-secret", "channel_name": "#channel"}, - "name": "projects/sd-project/notificationChannels/12345", - "type": "slack", + "type_": "slack", } TEST_NOTIFICATION_CHANNEL_2 = { - "displayName": "channel2", + "display_name": "channel2", "enabled": False, "labels": {"auth_token": "top-secret", "channel_name": "#channel"}, - "name": "projects/sd-project/notificationChannels/6789", - "type": "slack", + "type_": "slack", } with models.DAG( @@ -150,18 +175,29 @@ with models.DAG( # [START howto_operator_gcp_stackdriver_delete_notification_channel] delete_notification_channel = StackdriverDeleteNotificationChannelOperator( task_id='delete-notification-channel', - name='test-channel', + name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}", ) # [END howto_operator_gcp_stackdriver_delete_notification_channel] + delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator( + task_id='delete-notification-channel-2', + name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}", + ) + # [START howto_operator_gcp_stackdriver_delete_alert_policy] delete_alert_policy = StackdriverDeleteAlertOperator( task_id='delete-alert-policy', - name='test-alert', + name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}", ) # [END howto_operator_gcp_stackdriver_delete_alert_policy] + delete_alert_policy_2 = StackdriverDeleteAlertOperator( + task_id='delete-alert-policy-2', + name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}", + ) + create_notification_channel >> enable_notification_channel >> disable_notification_channel disable_notification_channel >> list_notification_channel >> create_alert_policy create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies - list_alert_policies >> delete_notification_channel >> delete_alert_policy + list_alert_policies >> delete_notification_channel >> delete_notification_channel_2 + delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2 diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py b/airflow/providers/google/cloud/hooks/stackdriver.py index 9da1afa..04dc329 100644 --- a/airflow/providers/google/cloud/hooks/stackdriver.py +++ b/airflow/providers/google/cloud/hooks/stackdriver.py @@ -24,7 +24,8 @@ from typing import Any, Optional, Sequence, Union from google.api_core.exceptions import InvalidArgument from google.api_core.gapic_v1.method import DEFAULT from google.cloud import monitoring_v3 -from google.protobuf.json_format import MessageToDict, MessageToJson, Parse +from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel +from google.protobuf.field_mask_pb2 import FieldMask from googleapiclient.errors import HttpError from airflow.exceptions import AirflowException @@ -110,18 +111,20 @@ class StackdriverHook(GoogleBaseHook): """ client = self._get_policy_client() policies_ = client.list_alert_policies( - name=f'projects/{project_id}', - filter_=filter_, - order_by=order_by, - page_size=page_size, + request={ + 'name': f'projects/{project_id}', + 'filter': filter_, + 'order_by': order_by, + 'page_size': page_size, + }, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) if format_ == "dict": - return [MessageToDict(policy) for policy in policies_] + return [AlertPolicy.to_dict(policy) for policy in policies_] elif format_ == "json": - return [MessageToJson(policy) for policy in policies_] + return [AlertPolicy.to_jsoon(policy) for policy in policies_] else: return policies_ @@ -138,12 +141,14 @@ class StackdriverHook(GoogleBaseHook): client = self._get_policy_client() policies_ = self.list_alert_policies(project_id=project_id, filter_=filter_) for policy in policies_: - if policy.enabled.value != bool(new_state): - policy.enabled.value = bool(new_state) - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - mask.paths.append('enabled') # pylint: disable=no-member + if policy.enabled != bool(new_state): + policy.enabled = bool(new_state) + mask = FieldMask(paths=['enabled']) client.update_alert_policy( - alert_policy=policy, update_mask=mask, retry=retry, timeout=timeout, metadata=metadata + request={'alert_policy': policy, 'update_mask': mask}, + retry=retry, + timeout=timeout, + metadata=metadata or (), ) @GoogleBaseHook.fallback_to_default_project_id @@ -265,40 +270,39 @@ class StackdriverHook(GoogleBaseHook): ] policies_ = [] channels = [] - - for channel in record["channels"]: - channel_json = json.dumps(channel) - channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel())) - for policy in record["policies"]: - policy_json = json.dumps(policy) - policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy())) + for channel in record.get("channels", []): + channels.append(NotificationChannel(**channel)) + for policy in record.get("policies", []): + policies_.append(AlertPolicy(**policy)) channel_name_map = {} for channel in channels: channel.verification_status = ( - monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED + monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED ) if channel.name in existing_channels: channel_client.update_notification_channel( - notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata + request={'notification_channel': channel}, + retry=retry, + timeout=timeout, + metadata=metadata or (), ) else: old_name = channel.name - channel.ClearField('name') + channel.name = None new_channel = channel_client.create_notification_channel( - name=f'projects/{project_id}', - notification_channel=channel, + request={'name': f'projects/{project_id}', 'notification_channel': channel}, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) channel_name_map[old_name] = new_channel.name for policy in policies_: - policy.ClearField('creation_record') - policy.ClearField('mutation_record') + policy.creation_record = None + policy.mutation_record = None for i, channel in enumerate(policy.notification_channels): new_channel = channel_name_map.get(channel) @@ -308,20 +312,22 @@ class StackdriverHook(GoogleBaseHook): if policy.name in existing_policies: try: policy_client.update_alert_policy( - alert_policy=policy, retry=retry, timeout=timeout, metadata=metadata + request={'alert_policy': policy}, + retry=retry, + timeout=timeout, + metadata=metadata or (), ) except InvalidArgument: pass else: - policy.ClearField('name') + policy.name = None for condition in policy.conditions: - condition.ClearField('name') + condition.name = None policy_client.create_alert_policy( - name=f'projects/{project_id}', - alert_policy=policy, + request={'name': f'projects/{project_id}', 'alert_policy': policy}, retry=retry, timeout=timeout, - metadata=None, + metadata=metadata or (), ) def delete_alert_policy( @@ -349,7 +355,9 @@ class StackdriverHook(GoogleBaseHook): """ policy_client = self._get_policy_client() try: - policy_client.delete_alert_policy(name=name, retry=retry, timeout=timeout, metadata=metadata) + policy_client.delete_alert_policy( + request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or () + ) except HttpError as err: raise AirflowException(f'Delete alerting policy failed. Error was {err.content}') @@ -405,18 +413,20 @@ class StackdriverHook(GoogleBaseHook): """ client = self._get_channel_client() channels = client.list_notification_channels( - name=f'projects/{project_id}', - filter_=filter_, - order_by=order_by, - page_size=page_size, + request={ + 'name': f'projects/{project_id}', + 'filter': filter_, + 'order_by': order_by, + 'page_size': page_size, + }, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) if format_ == "dict": - return [MessageToDict(channel) for channel in channels] + return [NotificationChannel.to_dict(channel) for channel in channels] elif format_ == "json": - return [MessageToJson(channel) for channel in channels] + return [NotificationChannel.to_json(channel) for channel in channels] else: return channels @@ -431,18 +441,18 @@ class StackdriverHook(GoogleBaseHook): metadata: Optional[str] = None, ) -> None: client = self._get_channel_client() - channels = client.list_notification_channels(name=f'projects/{project_id}', filter_=filter_) + channels = client.list_notification_channels( + request={'name': f'projects/{project_id}', 'filter': filter_} + ) for channel in channels: - if channel.enabled.value != bool(new_state): - channel.enabled.value = bool(new_state) - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - mask.paths.append('enabled') # pylint: disable=no-member + if channel.enabled != bool(new_state): + channel.enabled = bool(new_state) + mask = FieldMask(paths=['enabled']) client.update_notification_channel( - notification_channel=channel, - update_mask=mask, + request={'notification_channel': channel, 'update_mask': mask}, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) @GoogleBaseHook.fallback_to_default_project_id @@ -518,7 +528,7 @@ class StackdriverHook(GoogleBaseHook): new_state=False, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) @GoogleBaseHook.fallback_to_default_project_id @@ -562,29 +572,28 @@ class StackdriverHook(GoogleBaseHook): channel_name_map = {} for channel in record["channels"]: - channel_json = json.dumps(channel) - channels_list.append( - Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()) - ) + channels_list.append(NotificationChannel(**channel)) for channel in channels_list: channel.verification_status = ( - monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED + monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED ) if channel.name in existing_channels: channel_client.update_notification_channel( - notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata + request={'notification_channel': channel}, + retry=retry, + timeout=timeout, + metadata=metadata or (), ) else: old_name = channel.name - channel.ClearField('name') + channel.name = None new_channel = channel_client.create_notification_channel( - name=f'projects/{project_id}', - notification_channel=channel, + request={'name': f'projects/{project_id}', 'notification_channel': channel}, retry=retry, timeout=timeout, - metadata=metadata, + metadata=metadata or (), ) channel_name_map[old_name] = new_channel.name @@ -616,7 +625,7 @@ class StackdriverHook(GoogleBaseHook): channel_client = self._get_channel_client() try: channel_client.delete_notification_channel( - name=name, retry=retry, timeout=timeout, metadata=metadata + request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or () ) except HttpError as err: raise AirflowException(f'Delete notification channel failed. Error was {err.content}') diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py index dc86466..7289b12 100644 --- a/airflow/providers/google/cloud/operators/stackdriver.py +++ b/airflow/providers/google/cloud/operators/stackdriver.py @@ -19,6 +19,7 @@ from typing import Optional, Sequence, Union from google.api_core.gapic_v1.method import DEFAULT +from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook @@ -125,7 +126,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator): def execute(self, context): self.log.info( - 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d', + 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s', self.project_id, self.format_, self.filter_, @@ -139,7 +140,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator): impersonation_chain=self.impersonation_chain, ) - return self.hook.list_alert_policies( + result = self.hook.list_alert_policies( project_id=self.project_id, format_=self.format_, filter_=self.filter_, @@ -149,6 +150,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator): timeout=self.timeout, metadata=self.metadata, ) + return [AlertPolicy.to_dict(policy) for policy in result] class StackdriverEnableAlertPoliciesOperator(BaseOperator): @@ -614,7 +616,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator): def execute(self, context): self.log.info( - 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d', + 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s', self.project_id, self.format_, self.filter_, @@ -627,7 +629,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator): delegate_to=self.delegate_to, impersonation_chain=self.impersonation_chain, ) - return self.hook.list_notification_channels( + channels = self.hook.list_notification_channels( format_=self.format_, project_id=self.project_id, filter_=self.filter_, @@ -637,6 +639,8 @@ class StackdriverListNotificationChannelsOperator(BaseOperator): timeout=self.timeout, metadata=self.metadata, ) + result = [NotificationChannel.to_dict(channel) for channel in channels] + return result class StackdriverEnableNotificationChannelsOperator(BaseOperator): diff --git a/setup.py b/setup.py index 0f40d88..fa1e73a 100644 --- a/setup.py +++ b/setup.py @@ -294,7 +294,7 @@ google = [ 'google-cloud-language>=1.1.1,<2.0.0', 'google-cloud-logging>=1.14.0,<2.0.0', 'google-cloud-memcache>=0.2.0', - 'google-cloud-monitoring>=0.34.0,<2.0.0', + 'google-cloud-monitoring>=2.0.0,<3.0.0', 'google-cloud-os-login>=2.0.0,<3.0.0', 'google-cloud-pubsub>=2.0.0,<3.0.0', 'google-cloud-redis>=2.0.0,<3.0.0', diff --git a/tests/providers/google/cloud/hooks/test_stackdriver.py b/tests/providers/google/cloud/hooks/test_stackdriver.py index 6892d05..10a3097 100644 --- a/tests/providers/google/cloud/hooks/test_stackdriver.py +++ b/tests/providers/google/cloud/hooks/test_stackdriver.py @@ -21,8 +21,8 @@ import unittest from unittest import mock from google.api_core.gapic_v1.method import DEFAULT -from google.cloud import monitoring_v3 -from google.protobuf.json_format import ParseDict +from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel +from google.protobuf.field_mask_pb2 import FieldMask from airflow.providers.google.cloud.hooks import stackdriver @@ -32,16 +32,15 @@ TEST_FILTER = "filter" TEST_ALERT_POLICY_1 = { "combiner": "OR", "name": "projects/sd-project/alertPolicies/12345", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": True, - "displayName": "test display", + "display_name": "test display", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}], }, - "displayName": "Condition display", + "display_name": "Condition display", "name": "projects/sd-project/alertPolicies/123/conditions/456", } ], @@ -50,35 +49,34 @@ TEST_ALERT_POLICY_1 = { TEST_ALERT_POLICY_2 = { "combiner": "OR", "name": "projects/sd-project/alertPolicies/6789", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": False, - "displayName": "test display", + "display_name": "test display", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}], }, - "displayName": "Condition display", + "display_name": "Condition display", "name": "projects/sd-project/alertPolicies/456/conditions/789", } ], } TEST_NOTIFICATION_CHANNEL_1 = { - "displayName": "sd", + "display_name": "sd", "enabled": True, "labels": {"auth_token": "top-secret", "channel_name": "#channel"}, "name": "projects/sd-project/notificationChannels/12345", - "type": "slack", + "type_": "slack", } TEST_NOTIFICATION_CHANNEL_2 = { - "displayName": "sd", + "display_name": "sd", "enabled": False, "labels": {"auth_token": "top-secret", "channel_name": "#channel"}, "name": "projects/sd-project/notificationChannels/6789", - "type": "slack", + "type_": "slack", } @@ -96,13 +94,10 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) method.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=TEST_FILTER, + request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - order_by=None, - page_size=None, - metadata=None, + metadata=(), ) @mock.patch( @@ -113,8 +108,8 @@ class TestStackdriverHookMethods(unittest.TestCase): def test_stackdriver_enable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id): hook = stackdriver.StackdriverHook() - alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy()) - alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy()) + alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1) + alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2) alert_policies = [alert_policy_enabled, alert_policy_disabled] @@ -124,23 +119,18 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) mock_policy_client.return_value.list_alert_policies.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=TEST_FILTER, + request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - order_by=None, - page_size=None, - metadata=None, + metadata=(), ) - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - alert_policy_disabled.enabled.value = True # pylint: disable=no-member - mask.paths.append('enabled') # pylint: disable=no-member + mask = FieldMask(paths=["enabled"]) + alert_policy_disabled.enabled = True # pylint: disable=no-member mock_policy_client.return_value.update_alert_policy.assert_called_once_with( - alert_policy=alert_policy_disabled, - update_mask=mask, + request=dict(alert_policy=alert_policy_disabled, update_mask=mask), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -150,8 +140,8 @@ class TestStackdriverHookMethods(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client') def test_stackdriver_disable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id): hook = stackdriver.StackdriverHook() - alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy()) - alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy()) + alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1) + alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2) mock_policy_client.return_value.list_alert_policies.return_value = [ alert_policy_enabled, @@ -162,23 +152,18 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) mock_policy_client.return_value.list_alert_policies.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=TEST_FILTER, + request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - order_by=None, - page_size=None, - metadata=None, + metadata=(), ) - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - alert_policy_enabled.enabled.value = False # pylint: disable=no-member - mask.paths.append('enabled') # pylint: disable=no-member + mask = FieldMask(paths=["enabled"]) + alert_policy_enabled.enabled = False # pylint: disable=no-member mock_policy_client.return_value.update_alert_policy.assert_called_once_with( - alert_policy=alert_policy_enabled, - update_mask=mask, + request=dict(alert_policy=alert_policy_enabled, update_mask=mask), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -191,8 +176,8 @@ class TestStackdriverHookMethods(unittest.TestCase): self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id ): hook = stackdriver.StackdriverHook() - existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy()) - alert_policy_to_create = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy()) + existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1) + alert_policy_to_create = AlertPolicy(**TEST_ALERT_POLICY_2) mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy] mock_channel_client.return_value.list_notification_channels.return_value = [] @@ -202,38 +187,77 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) mock_channel_client.return_value.list_notification_channels.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=None, + request=dict( + name=f'projects/{PROJECT_ID}', + filter=None, + order_by=None, + page_size=None, + ), retry=DEFAULT, timeout=DEFAULT, - order_by=None, - page_size=None, - metadata=None, + metadata=(), ) mock_policy_client.return_value.list_alert_policies.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=None, + request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - order_by=None, - page_size=None, - metadata=None, + metadata=(), ) - alert_policy_to_create.ClearField('name') - alert_policy_to_create.ClearField('creation_record') - alert_policy_to_create.ClearField('mutation_record') - alert_policy_to_create.conditions[0].ClearField('name') # pylint: disable=no-member + alert_policy_to_create.name = None + alert_policy_to_create.creation_record = None + alert_policy_to_create.mutation_record = None + alert_policy_to_create.conditions[0].name = None mock_policy_client.return_value.create_alert_policy.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - alert_policy=alert_policy_to_create, + request=dict( + name=f'projects/{PROJECT_ID}', + alert_policy=alert_policy_to_create, + ), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) - existing_alert_policy.ClearField('creation_record') - existing_alert_policy.ClearField('mutation_record') + existing_alert_policy.creation_record = None + existing_alert_policy.mutation_record = None mock_policy_client.return_value.update_alert_policy.assert_called_once_with( - alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None + request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=() + ) + + @mock.patch( + 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id', + return_value=(CREDENTIALS, PROJECT_ID), + ) + @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client') + @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client') + def test_stackdriver_upsert_alert_policy_without_channel( + self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id + ): + hook = stackdriver.StackdriverHook() + existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1) + + mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy] + mock_channel_client.return_value.list_notification_channels.return_value = [] + + hook.upsert_alert( + alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}), + project_id=PROJECT_ID, + ) + mock_channel_client.return_value.list_notification_channels.assert_called_once_with( + request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None), + metadata=(), + retry=DEFAULT, + timeout=DEFAULT, + ) + mock_policy_client.return_value.list_alert_policies.assert_called_once_with( + request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None), + retry=DEFAULT, + timeout=DEFAULT, + metadata=(), + ) + + existing_alert_policy.creation_record = None + existing_alert_policy.mutation_record = None + mock_policy_client.return_value.update_alert_policy.assert_called_once_with( + request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=() ) @mock.patch( @@ -247,10 +271,10 @@ class TestStackdriverHookMethods(unittest.TestCase): name='test-alert', ) mock_policy_client.return_value.delete_alert_policy.assert_called_once_with( - name='test-alert', + request=dict(name='test-alert'), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -265,13 +289,10 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) mock_channel_client.return_value.list_notification_channels.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=TEST_FILTER, - order_by=None, - page_size=None, + request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -283,12 +304,9 @@ class TestStackdriverHookMethods(unittest.TestCase): self, mock_channel_client, mock_get_creds_and_project_id ): hook = stackdriver.StackdriverHook() - notification_channel_enabled = ParseDict( - TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel() - ) - notification_channel_disabled = ParseDict( - TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel() - ) + notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1) + notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2) + mock_channel_client.return_value.list_notification_channels.return_value = [ notification_channel_enabled, notification_channel_disabled, @@ -299,15 +317,13 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) - notification_channel_disabled.enabled.value = True # pylint: disable=no-member - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - mask.paths.append('enabled') # pylint: disable=no-member + notification_channel_disabled.enabled = True # pylint: disable=no-member + mask = FieldMask(paths=['enabled']) mock_channel_client.return_value.update_notification_channel.assert_called_once_with( - notification_channel=notification_channel_disabled, - update_mask=mask, + request=dict(notification_channel=notification_channel_disabled, update_mask=mask), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -319,12 +335,8 @@ class TestStackdriverHookMethods(unittest.TestCase): self, mock_channel_client, mock_get_creds_and_project_id ): hook = stackdriver.StackdriverHook() - notification_channel_enabled = ParseDict( - TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel() - ) - notification_channel_disabled = ParseDict( - TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel() - ) + notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1) + notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2) mock_channel_client.return_value.list_notification_channels.return_value = [ notification_channel_enabled, notification_channel_disabled, @@ -335,15 +347,13 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) - notification_channel_enabled.enabled.value = False # pylint: disable=no-member - mask = monitoring_v3.types.field_mask_pb2.FieldMask() - mask.paths.append('enabled') # pylint: disable=no-member + notification_channel_enabled.enabled = False # pylint: disable=no-member + mask = FieldMask(paths=['enabled']) mock_channel_client.return_value.update_notification_channel.assert_called_once_with( - notification_channel=notification_channel_enabled, - update_mask=mask, + request=dict(notification_channel=notification_channel_enabled, update_mask=mask), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -353,12 +363,9 @@ class TestStackdriverHookMethods(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client') def test_stackdriver_upsert_channel(self, mock_channel_client, mock_get_creds_and_project_id): hook = stackdriver.StackdriverHook() - existing_notification_channel = ParseDict( - TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel() - ) - notification_channel_to_be_created = ParseDict( - TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel() - ) + existing_notification_channel = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1) + notification_channel_to_be_created = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2) + mock_channel_client.return_value.list_notification_channels.return_value = [ existing_notification_channel ] @@ -367,24 +374,25 @@ class TestStackdriverHookMethods(unittest.TestCase): project_id=PROJECT_ID, ) mock_channel_client.return_value.list_notification_channels.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - filter_=None, - order_by=None, - page_size=None, + request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) mock_channel_client.return_value.update_notification_channel.assert_called_once_with( - notification_channel=existing_notification_channel, retry=DEFAULT, timeout=DEFAULT, metadata=None + request=dict(notification_channel=existing_notification_channel), + retry=DEFAULT, + timeout=DEFAULT, + metadata=(), ) - notification_channel_to_be_created.ClearField('name') + notification_channel_to_be_created.name = None mock_channel_client.return_value.create_notification_channel.assert_called_once_with( - name=f'projects/{PROJECT_ID}', - notification_channel=notification_channel_to_be_created, + request=dict( + name=f'projects/{PROJECT_ID}', notification_channel=notification_channel_to_be_created + ), retry=DEFAULT, timeout=DEFAULT, - metadata=None, + metadata=(), ) @mock.patch( @@ -400,5 +408,5 @@ class TestStackdriverHookMethods(unittest.TestCase): name='test-channel', ) mock_channel_client.return_value.delete_notification_channel.assert_called_once_with( - name='test-channel', retry=DEFAULT, timeout=DEFAULT, metadata=None + request=dict(name='test-channel'), retry=DEFAULT, timeout=DEFAULT, metadata=() ) diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py index 28901b4..50dd997 100644 --- a/tests/providers/google/cloud/operators/test_stackdriver.py +++ b/tests/providers/google/cloud/operators/test_stackdriver.py @@ -21,6 +21,7 @@ import unittest from unittest import mock from google.api_core.gapic_v1.method import DEFAULT +from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel from airflow.providers.google.cloud.operators.stackdriver import ( StackdriverDeleteAlertOperator, @@ -40,16 +41,15 @@ TEST_FILTER = 'filter' TEST_ALERT_POLICY_1 = { "combiner": "OR", "name": "projects/sd-project/alertPolicies/12345", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": True, - "displayName": "test display", + "display_name": "test display", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "aggregations": [{"alignment_eriod": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}], }, - "displayName": "Condition display", + "display_name": "Condition display", "name": "projects/sd-project/alertPolicies/123/conditions/456", } ], @@ -58,16 +58,15 @@ TEST_ALERT_POLICY_1 = { TEST_ALERT_POLICY_2 = { "combiner": "OR", "name": "projects/sd-project/alertPolicies/6789", - "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"}, "enabled": False, - "displayName": "test display", + "display_name": "test display", "conditions": [ { - "conditionThreshold": { + "condition_threshold": { "comparison": "COMPARISON_GT", - "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}], + "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}], }, - "displayName": "Condition display", + "display_name": "Condition display", "name": "projects/sd-project/alertPolicies/456/conditions/789", } ], @@ -94,7 +93,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook') def test_execute(self, mock_hook): operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER) - operator.execute(None) + mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")] + result = operator.execute(None) mock_hook.return_value.list_alert_policies.assert_called_once_with( project_id=None, filter_=TEST_FILTER, @@ -105,6 +105,16 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase): timeout=DEFAULT, metadata=None, ) + assert [ + { + 'combiner': 0, + 'conditions': [], + 'display_name': '', + 'name': 'test-name', + 'notification_channels': [], + 'user_labels': {}, + } + ] == result class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase): @@ -160,7 +170,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook') def test_execute(self, mock_hook): operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER) - operator.execute(None) + mock_hook.return_value.list_notification_channels.return_value = [ + NotificationChannel(name="test-123") + ] + + result = operator.execute(None) mock_hook.return_value.list_notification_channels.assert_called_once_with( project_id=None, filter_=TEST_FILTER, @@ -171,6 +185,17 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase): timeout=DEFAULT, metadata=None, ) + assert [ + { + 'description': '', + 'display_name': '', + 'labels': {}, + 'name': 'test-123', + 'type_': '', + 'user_labels': {}, + 'verification_status': 0, + } + ] == result class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
