dabla commented on PR #68377:
URL: https://github.com/apache/airflow/pull/68377#issuecomment-4681048098
I think I found the issue:
```
@task(show_return_value_in_logs=False, task_concurrency=5)
async def get_user_registered_devices(user_id):
from airflow.exceptions import AirflowNotFoundException
from contextlib import suppress
with suppress(AirflowNotFoundException):
hook = KiotaRequestAdapterHook.get_hook(conn_id=MSGRAPH_CONN_ID)
results = await hook.paginated_run(
url=f"users/{user_id}/registeredDevices",
query_parameters={"$select": "id"},
)
return results
return []
```
We should implement an async get_hook in BaseHook, hence this is why it's
causing deadlock, as the get_hook method underneath calls the sync
get_connection method.
That was also the mean difference between my MSGraph case and SFTP one,
hence why I didn't have the issue with SFTP example, because there I'm using
the async SFTPClientPool:
```
@task(
retries=3,
retry_delay=timedelta(seconds=60),
show_return_value_in_logs=False,
)
async def load_xml_files(file, **context):
import logging
from io import BytesIO
from os import cpu_count
from airflow.providers.sftp.pools.sftp import SFTPClientPool
connection = get_connection(sftp_conn)
async with SFTPClientPool(
sftp_conn_id=sftp_conn, pool_size=cpu_count()
).get_sftp_client() as sftp:
logging.info("downloading: %s", file)
buffer = BytesIO()
async with sftp.open(file, encoding=xml_encoding) as
remote_file:
data = await remote_file.read()
buffer.write(data.encode(xml_encoding))
buffer.seek(0)
# Convert and return rows for DB insert
return convert_to_rows(
connection,
file,
convert_xml_to_json(
buffer,
xml_namespaces,
xml_encoding,
xml_force_list,
),
)
```
So it's good that you challenged the issue!
Also I can confirm that the latest version of CommDecoder send method in 3.3
(from main) doesn't deadlock anymore with the example of
KiotaRequestAdapterHook.get_hook.
I tested with that version through monkey patching on our 3.2.2. Probably
something has changed in the meanwhile which solved the deadlock that not
present in 3.2.2.
Still I think it's a good thing to add the async aget_hook method on
BaseHook.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]