dabla commented on code in PR #65480:
URL: https://github.com/apache/airflow/pull/65480#discussion_r3289047110
##########
providers/sftp/src/airflow/providers/sftp/hooks/sftp.py:
##########
@@ -1040,3 +1109,46 @@ async def get_mod_time(self, path: str) -> str: # type:
ignore[return]
return mod_time
except asyncssh.SFTPNoSuchFile:
raise AirflowException("No files matching")
+
+ async def transfer(
+ self,
+ operation: str,
+ local_filepath: str | list[str] | None,
+ remote_filepath: str | list[str],
+ confirm: bool = True,
+ create_intermediate_dirs: bool = False,
+ concurrency: int = 1,
+ prefetch: bool = True,
+ ) -> None:
+ """
+ Perform an SFTP transfer operation (GET, PUT, or DELETE) via a thread
executor.
+
+ Delegates to :meth:`SFTPHook.transfer` so that both the operator and
the
+ trigger share identical transfer logic, in line with the DRY principle.
+ Using a thread executor keeps the async event loop unblocked while the
+ synchronous paramiko transfer runs in a worker thread.
+
+ :param operation: The SFTP operation - put, get, or delete.
+ :param local_filepath: Local file path(s).
+ :param remote_filepath: Remote file path(s).
+ :param confirm: Whether to confirm file size after PUT (default: True).
+ :param create_intermediate_dirs: Create missing intermediate
directories (default: False).
+ :param concurrency: Number of threads for directory transfers
(default: 1).
+ :param prefetch: Whether to prefetch during GET (default: True).
+ """
+ import asyncio
+
+ loop = asyncio.get_running_loop()
+ sync_hook = SFTPHook(ssh_conn_id=self.sftp_conn_id)
+ await loop.run_in_executor(
+ None,
+ lambda: sync_hook.transfer(
+ operation=operation,
+ local_filepath=local_filepath,
+ remote_filepath=remote_filepath,
+ confirm=confirm,
+ create_intermediate_dirs=create_intermediate_dirs,
+ concurrency=concurrency,
+ prefetch=prefetch,
+ ),
+ )
Review Comment:
Just wondering, can't this just be written as:
```
sync_hook = SFTPHook(ssh_conn_id=self.sftp_conn_id)
asyncio.run(
sync_hook.transfer(
operation=operation,
local_filepath=local_filepath,
remote_filepath=remote_filepath,
confirm=confirm,
create_intermediate_dirs=create_intermediate_dirs,
concurrency=concurrency,
prefetch=prefetch,
),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]