dabla commented on PR #68377:
URL: https://github.com/apache/airflow/pull/68377#issuecomment-4681048098

   I think I found the issue:
   
   ```
   @task(show_return_value_in_logs=False, task_concurrency=5)
       async def get_user_registered_devices(user_id):
           from airflow.exceptions import AirflowNotFoundException
           from contextlib import suppress
   
           with suppress(AirflowNotFoundException):
               hook = KiotaRequestAdapterHook.get_hook(conn_id=MSGRAPH_CONN_ID)
               results = await hook.paginated_run(
                   url=f"users/{user_id}/registeredDevices",
                   query_parameters={"$select": "id"},
               )
               return results
           return []
   ```
   
   We should implement an async get_hook in BaseHook, hence this is why it's 
causing deadlock, as the get_hook method underneath calls the sync 
get_connection method.
   
   That was also the mean difference between my MSGraph case and SFTP one, 
hence why I didn't have the issue with SFTP example, because there I'm using 
the async SFTPClientPool:
   
   ```
          @task(
               retries=3,
               retry_delay=timedelta(seconds=60),
               show_return_value_in_logs=False,
           )
           async def load_xml_files(file, **context):
               import logging
               from io import BytesIO
               from os import cpu_count
               from airflow.providers.sftp.pools.sftp import SFTPClientPool
   
               connection = get_connection(sftp_conn)
   
               async with SFTPClientPool(
                   sftp_conn_id=sftp_conn, pool_size=cpu_count()
               ).get_sftp_client() as sftp:
                   logging.info("downloading: %s", file)
                   buffer = BytesIO()
                   async with sftp.open(file, encoding=xml_encoding) as 
remote_file:
                       data = await remote_file.read()
                       buffer.write(data.encode(xml_encoding))
                       buffer.seek(0)
                   # Convert and return rows for DB insert
                   return convert_to_rows(
                       connection,
                       file,
                       convert_xml_to_json(
                           buffer,
                           xml_namespaces,
                           xml_encoding,
                           xml_force_list,
                       ),
                   )
   ```
   
   So it's good that you challenged the issue!
   
   Also I can confirm that the latest version of CommDecoder send method in 3.3 
(from main) doesn't deadlock anymore with the example of 
KiotaRequestAdapterHook.get_hook.
   I tested with that version through monkey patching on our 3.2.2.  Probably 
something has changed in the meanwhile which solved the deadlock that not 
present in 3.2.2.
   Still I think it's a good thing to add the async aget_hook method on 
BaseHook.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to