opeida opened a new issue, #53447:
URL: https://github.com/apache/airflow/issues/53447
### Apache Airflow version
3.0.3
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
The error `RuntimeError: You cannot use AsyncToSync in the same thread as an
async event loop - just await the async function directly.` is occurred when
`BigQueryHook` inits in `async def run(self)`. The issue began immediately
after upgrading from 3.0.2 to 3.0.3.
```Python
class BigQueryTaskStatusTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
try:
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id,
use_legacy_sql=False)
while True:
status = await self._get_status(hook)
```
The error can be fixed by inititializing `BigQueryHook` in a function with
`@sync_to_async` decorator but it doesn't look like the best practice:
```Python
@sync_to_async
def _get_status(self) -> str | None:
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
self.log.info(f"Retrieving status for task {self.monitored_task_id}")
bq_client = hook.get_client(project_id=self.project_id)
job = bq_client.query(self.query)
result = list(job.result())
return result[0].status if result else None
```
### What you think should happen instead?
The hook should be initialized without errors.
### How to reproduce
**dag.py:**
```Python
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from .sensors import BigQueryTaskStatusSensor
with DAG(
f"trigger_test_dag",
) as dag:
start = EmptyOperator(task_id="start")
check_task_completed = BigQueryTaskStatusSensor(
task_id="check_task_completed",
monitored_task_id="test-task-1",
deferrable=True
)
end = EmptyOperator(task_id="end", trigger_rule="none_failed")
start >> check_task_completed >> end
if __name__ == "__main__":
dag.test()
```
**sensors.py:**
```Python
from collections.abc import Sequence
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.exceptions import AirflowFailException
from airflow.sdk.definitions.context import Context
from airflow.sensors.base import BaseSensorOperator
from .triggers import BigQueryTaskStatusTrigger
class BigQueryTaskStatusSensor(BaseSensorOperator):
template_fields: Sequence[str] = ("monitored_task_id", "gcp_conn_id",
"project_id", "dataset", "table")
def __init__(
self,
*,
monitored_task_id: str,
gcp_conn_id: str = "gcp-adders-automation",
project_id: str = "automation",
dataset: str = "cloud_functions_logs",
table: str = "airflow_transfer_data_to_bq",
deferrable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
self.monitored_task_id = monitored_task_id
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
self.dataset = dataset
self.table = table
self.deferrable = deferrable
def execute(self, context: Context) -> Any:
if self.deferrable:
self.defer(
trigger=BigQueryTaskStatusTrigger(
monitored_task_id=self.monitored_task_id,
gcp_conn_id=self.gcp_conn_id,
project_id=self.project_id,
query=self._get_sql_query(),
target_status="SUCCESS",
fail_status="FAILED",
poke_interval=15
),
timeout=timedelta(minutes=15),
method_name="execute_complete"
)
else:
raise AirflowFailException("The sensor must be deferrable.")
def execute_complete(self, context: Context, event: dict[str, Any] |
None = None) -> Any:
return
def _get_sql_query(self) -> str:
return f"""SELECT status
FROM `{self.project_id}.{self.dataset}.{self.table}`
WHERE task_name = '{self.monitored_task_id}'
ORDER BY created_at DESC
LIMIT 1
;
"""
```
**triggers.py:**
```Python
import asyncio
from collections.abc import AsyncIterator
from typing import Any
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.triggers.base import BaseTrigger, TaskFailedEvent, TriggerEvent
class BigQueryTaskStatusTrigger(BaseTrigger):
def __init__(
self,
monitored_task_id: str,
gcp_conn_id: str,
project_id: str,
query: str,
target_status: str,
fail_status: str,
poke_interval: float,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.monitored_task_id = monitored_task_id
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
self.query = query
self.target_status = target_status
self.fail_status = fail_status
self.poke_interval = poke_interval
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"common.airflow_custom.triggers.BigQueryTaskStatusTrigger",
{
"monitored_task_id": self.monitored_task_id,
"gcp_conn_id": self.gcp_conn_id,
"project_id": self.project_id,
"query": self.query,
"target_status": self.target_status,
"fail_status": self.fail_status,
"poke_interval": self.poke_interval,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
try:
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id,
use_legacy_sql=False)
while True:
status = await self._get_status(hook)
if not status:
yield TaskFailedEvent(
f"An error occurred while receiving status for task:
{self.monitored_task_id}"
)
return
match status:
case self.target_status:
yield TriggerEvent({"monitored_task_id":
self.monitored_task_id})
return
case self.fail_status:
yield TaskFailedEvent(f"Task
{self.monitored_task_id} failed")
return
case _:
self.log.info(f"Task {self.monitored_task_id} is in
status {status}")
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TaskFailedEvent(f"Something went wrong.\n{str(e)}")
return
async def _get_status(self, hook: BigQueryHook) -> str | None:
self.log.info(f"Retrieving status for task {self.monitored_task_id}")
bq_client = hook.get_client(project_id=self.project_id)
job = bq_client.query(self.query)
result = list(job.result())
return result[0].status if result else None
```
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.12.1
apache-airflow-providers-common-compat==1.7.2
apache-airflow-providers-common-io==1.6.1
apache-airflow-providers-common-sql==1.27.3
apache-airflow-providers-fab==2.3.0
apache-airflow-providers-google==16.1.0
apache-airflow-providers-http==5.3.2
apache-airflow-providers-postgres==6.2.1
apache-airflow-providers-redis==4.1.1
apache-airflow-providers-smtp==2.1.1
apache-airflow-providers-standard==1.4.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Deployed on GKE with extended image based on
`apache/airflow:slim-3.0.3-python3.12` and Helm chart 1.17.0
### Anything else?
```
Trigger failed:
Traceback (most recent call last):
File "/opt/airflow/dags/repo/dags/common/airflow_custom/triggers.py", line
47, in run
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
line 167, in __init__
super().__init__(**kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/common/hooks/base_google.py",
line 284, in __init__
self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", line
64, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py",
line 472, in get_connection_from_secrets
conn = TaskSDKConnection.get(conn_id=conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
line 142, in get
return _get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py",
line 155, in _get_connection
msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 713, in send
return async_to_sync(self.asend)(msg)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py",
line 186, in __call__
raise RuntimeError(
RuntimeError: You cannot use AsyncToSync in the same thread as an async
event loop - just await the async function directly.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 936, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 1045, in run_trigger
async for event in trigger.run():
File "/opt/airflow/dags/repo/dags/common/airflow_custom/triggers.py", line
66, in run
yield TaskFailedEvent(f"Something went wrong.\n{str(e)}")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: BaseTaskEndEvent.__init__() takes 1 positional argument but 2
were given
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]