pankajastro commented on code in PR #30852: URL: https://github.com/apache/airflow/pull/30852#discussion_r1177977785
########## airflow/providers/google/cloud/hooks/cloud_sql.py: ########## @@ -407,6 +408,51 @@ def _wait_for_operation_to_complete(self, project_id: str, operation_name: str) ) +class CloudSQLAsyncHook(GoogleBaseAsyncHook): + """Class to get asynchronous hook for Google Cloud SQL.""" + + sync_hook_class = CloudSQLHook + + async def get_conn(self, session, url): + + scopes = [ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/sqlservice.admin", + ] + + async with Token(scopes=scopes) as token: + session_aio = AioSession(session) + + headers = { + "Authorization": f"Bearer {await token.get()}", + } + + try: + operation = await session_aio.get(url=url, headers=headers) + except AirflowException: + pass + + return operation + + async def get_operation_name(self, project_id: str, operation_name: str, session): + url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project_id}/operations/{operation_name}" + return await self.get_conn(url=str(url), session=session) + + async def get_operation(self, project_id: str, operation_name: str): + async with ClientSession() as session: + try: + operation = await self.get_operation_name( + project_id=project_id, + operation_name=operation_name, + session=session, + ) + operation = await operation.json(content_type=None) + except Exception as e: Review Comment: would it be possible to catch the error related to this API only? The exception looks very broad. Also since will are trying to read some param from this exception trigger so reducing scope make sense to me ########## airflow/providers/google/cloud/hooks/cloud_sql.py: ########## @@ -407,6 +408,51 @@ def _wait_for_operation_to_complete(self, project_id: str, operation_name: str) ) +class CloudSQLAsyncHook(GoogleBaseAsyncHook): + """Class to get asynchronous hook for Google Cloud SQL.""" + + sync_hook_class = CloudSQLHook + + async def get_conn(self, session, url): + + scopes = [ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/sqlservice.admin", + ] + + async with Token(scopes=scopes) as token: + session_aio = AioSession(session) + + headers = { + "Authorization": f"Bearer {await token.get()}", + } + + try: + operation = await session_aio.get(url=url, headers=headers) + except AirflowException: + pass Review Comment: why we are ignoring error? ########## airflow/providers/google/cloud/hooks/cloud_sql.py: ########## @@ -407,6 +408,51 @@ def _wait_for_operation_to_complete(self, project_id: str, operation_name: str) ) +class CloudSQLAsyncHook(GoogleBaseAsyncHook): + """Class to get asynchronous hook for Google Cloud SQL.""" + + sync_hook_class = CloudSQLHook + + async def get_conn(self, session, url): + + scopes = [ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/sqlservice.admin", + ] + + async with Token(scopes=scopes) as token: + session_aio = AioSession(session) + + headers = { + "Authorization": f"Bearer {await token.get()}", + } + + try: + operation = await session_aio.get(url=url, headers=headers) + except AirflowException: + pass + + return operation + + async def get_operation_name(self, project_id: str, operation_name: str, session): + url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project_id}/operations/{operation_name}" + return await self.get_conn(url=str(url), session=session) + + async def get_operation(self, project_id: str, operation_name: str): + async with ClientSession() as session: + try: + operation = await self.get_operation_name( + project_id=project_id, + operation_name=operation_name, + session=session, + ) + operation = await operation.json(content_type=None) + except Exception as e: + self.log.info("Retrieving operation finished with errors.") Review Comment: since we are logging similar messages in the trigger too so do we need to log at both places? ########## airflow/providers/google/cloud/operators/cloud_sql.py: ########## @@ -933,6 +934,8 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param deferrable: Run operator in the deferrable mode. + :param poke_interval: Time (seconds) to wait between calls to check the run status. Review Comment: ```suggestion :param poke_interval: (Deferrable mode only) Time (seconds) to wait between calls to check the run status. ``` ########## airflow/providers/google/cloud/hooks/cloud_sql.py: ########## @@ -156,7 +158,7 @@ def create_instance(self, body: dict, project_id: str) -> None: .execute(num_retries=self.num_retries) ) operation_name = response["name"] - self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name) + self.wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name) Review Comment: curious why we are making it public? ########## airflow/providers/google/cloud/triggers/cloud_sql.py: ########## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Cloud SQL triggers.""" +from __future__ import annotations + +import asyncio +from typing import Sequence + +from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLAsyncHook, CloudSqlOperationStatus +from airflow.triggers.base import BaseTrigger, TriggerEvent + + +class CloudSQLExportTrigger(BaseTrigger): + """ + Trigger that periodically polls information from Cloud SQL API to verify job status. + Implementation leverages asynchronous transport. + """ + + def __init__( + self, + operation_name: str, + project_id: str | None = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + poke_interval: int = 20, + ): + super().__init__() + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.operation_name = operation_name + self.project_id = project_id + self.poke_interval = poke_interval + self.hook = CloudSQLAsyncHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + + def serialize(self): + return ( + "airflow.providers.google.cloud.triggers.cloud_sql.CloudSQLExportTrigger", + { + "operation_name": self.operation_name, + "project_id": self.project_id, + "gcp_conn_id": self.gcp_conn_id, + "impersonation_chain": self.impersonation_chain, + "poke_interval": self.poke_interval, + }, + ) + + async def run(self): + while True: + try: + operation = await self.hook.get_operation( + project_id=self.project_id, operation_name=self.operation_name + ) + + if operation["status"] == CloudSqlOperationStatus.DONE: + if "error" in operation: + yield TriggerEvent( Review Comment: would it be possible to combine error and success TriggerEvent and check directly in execute_complete ``` if if operation["status"] == CloudSqlOperationStatus.DONE: yield TriggerEvent( { "operation_name": operation["name"], "status": operation["status"], } ) ``` and in execute_complete ``` if event["statue"] == "success": log("success") else: raise error ``` ########## airflow/providers/google/cloud/triggers/cloud_sql.py: ########## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Cloud SQL triggers.""" +from __future__ import annotations + +import asyncio +from typing import Sequence + +from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLAsyncHook, CloudSqlOperationStatus +from airflow.triggers.base import BaseTrigger, TriggerEvent + + +class CloudSQLExportTrigger(BaseTrigger): Review Comment: can we register this class in provider.yaml ########## airflow/providers/google/cloud/triggers/cloud_sql.py: ########## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Cloud SQL triggers.""" +from __future__ import annotations + +import asyncio +from typing import Sequence + +from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLAsyncHook, CloudSqlOperationStatus +from airflow.triggers.base import BaseTrigger, TriggerEvent + + +class CloudSQLExportTrigger(BaseTrigger): + """ + Trigger that periodically polls information from Cloud SQL API to verify job status. + Implementation leverages asynchronous transport. + """ + + def __init__( + self, + operation_name: str, + project_id: str | None = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + poke_interval: int = 20, + ): + super().__init__() + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.operation_name = operation_name + self.project_id = project_id + self.poke_interval = poke_interval + self.hook = CloudSQLAsyncHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + + def serialize(self): + return ( + "airflow.providers.google.cloud.triggers.cloud_sql.CloudSQLExportTrigger", + { + "operation_name": self.operation_name, + "project_id": self.project_id, + "gcp_conn_id": self.gcp_conn_id, + "impersonation_chain": self.impersonation_chain, + "poke_interval": self.poke_interval, + }, + ) + + async def run(self): + while True: + try: + operation = await self.hook.get_operation( + project_id=self.project_id, operation_name=self.operation_name + ) + + if operation["status"] == CloudSqlOperationStatus.DONE: + if "error" in operation: + yield TriggerEvent( + { + "operation_name": operation["name"], + "operation_status": operation["status"], + "status": "error", + "message": operation["error"]["message"], + } + ) + return + yield TriggerEvent( + { + "operation_name": operation["name"], + "operation_status": operation["status"], + "status": "success", + "message": "Operation is successfully ended.", + } + ) + return + else: Review Comment: can we combine both log as single log something like ``` self.log("Operation is %s slpeeing for %s second", status, self.poke_interval) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
