dabla commented on code in PR #65480:
URL: https://github.com/apache/airflow/pull/65480#discussion_r3289069898


##########
providers/sftp/src/airflow/providers/sftp/triggers/sftp.py:
##########
@@ -139,3 +139,135 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
     def _get_async_hook(self) -> SFTPHookAsync:
         return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)
+
+
+class SFTPOperatorTrigger(BaseTrigger):
+    """
+    Trigger for SFTPOperator deferrable mode.
+
+    Fires when a file transfer (PUT, GET, or DELETE) completes
+    on the SFTP server, freeing the worker slot during the transfer.
+
+    :param ssh_conn_id: The SSH connection ID to use.
+    :param local_filepath: Local file path(s) to transfer.
+    :param remote_filepath: Remote file path(s) on the SFTP server.
+    :param operation: The SFTP operation - put, get, or delete.
+    :param confirm: Whether to confirm the file transfer.
+    :param create_intermediate_dirs: Whether to create intermediate dirs.
+    :param remote_host: Remote host to connect to (overrides connection).
+    :param concurrency: Number of threads for directory transfers.
+    :param prefetch: Whether to prefetch during file retrieval.
+    """
+
+    def __init__(
+        self,
+        ssh_conn_id: str | None = None,
+        local_filepath: str | list[str] | None = None,
+        remote_filepath: str | list[str] = "",
+        operation: str = "put",
+        confirm: bool = True,
+        create_intermediate_dirs: bool = False,
+        remote_host: str | None = None,
+        concurrency: int = 1,
+        prefetch: bool = True,
+    ) -> None:
+        super().__init__()
+        self.ssh_conn_id = ssh_conn_id
+        self.local_filepath = local_filepath
+        self.remote_filepath = remote_filepath
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.remote_host = remote_host
+        self.concurrency = concurrency
+        self.prefetch = prefetch
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize the trigger for storage in the database."""
+        return (
+            f"{self.__class__.__module__}.{self.__class__.__name__}",
+            {
+                "ssh_conn_id": self.ssh_conn_id,
+                "local_filepath": self.local_filepath,
+                "remote_filepath": self.remote_filepath,
+                "operation": self.operation,
+                "confirm": self.confirm,
+                "create_intermediate_dirs": self.create_intermediate_dirs,
+                "remote_host": self.remote_host,
+                "concurrency": self.concurrency,
+                "prefetch": self.prefetch,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        """Run the file transfer asynchronously and yield a TriggerEvent when 
done."""
+        try:
+            loop = asyncio.get_running_loop()
+            await loop.run_in_executor(

Review Comment:
   just call and await the transfer method from async SFTPHook



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to