josh-fell commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775321471



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a 
versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as 
basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT 
operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, 
defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories 
when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder 
`/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder 
`/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,

Review comment:
       ```suggestion
           local_files_path: Optional[List[str]] = None,
           remote_files_path: Optional[List[str]] = None,
   ```
   For consistency, it may be nice to add typing annotations to the other 
parameters as well.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a 
versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as 
basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT 
operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, 
defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories 
when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if 
they
+        don't exist. If the parameter is not passed it would error as the 
directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or 
self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       I think we should be very careful about adding type validations to 
template fields (IMO these validations should move under the `execute()` 
scope). There might be unexpected behavior if users pass in `XComArgs` which 
are not rendered/evaluated until `execute()`. The `XComArgs` may indeed render 
to lists or strings but their types will be evaluated as `XComArg` within 
`__init__()`.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a 
versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as 
basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT 
operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, 
defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories 
when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder 
`/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder 
`/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or 
self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + 
remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + 
optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:
+        dump_file_name_for_log = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if self.local_folder and self.remote_folder:
+                        files_list = 
self._search_files(os.listdir(self.local_folder))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            
self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, self.local_folder, 
local_file, self.remote_folder)
+                    if self.local_files_path and self.remote_folder:
+                        for file in self.local_files_path:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            
self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, os.path.dirname(file), 
local_file, self.remote_folder)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if self.remote_folder and self.local_folder:
+                        files_list = 
self._search_files(sftp_client.listdir(self.remote_folder))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            
self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, 
remote_file, self.remote_folder)
+                    if self.remote_files_path and self.local_folder:
+                        for file in self.remote_files_path:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            
self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, 
remote_file, os.path.dirname(file))
+
+        except Exception as e:
+            raise AirflowException(f"Error while transferring 
{dump_file_name_for_log}, error: {str(e)}")
+
+        return self.local_folder

Review comment:
       Should this return `self.local_folder` and/or `self.remote_folder` 
depending on what is provided?

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a 
versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as 
basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT 
operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, 
defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories 
when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder 
`/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder 
`/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or 
self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + 
remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + 
optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:

Review comment:
       Now that 2.2.3 has been released, the typing of `context` can be:
   
   ```python
   from typing import TYPE_CHECKING
   
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   
   ...
   def execute(self, context: "Context") -> str:
   ...
   ```

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs

Review comment:
       WDYT about moving `SFTPOperation `, `_check_conn` and 
`_make_intermediate_dirs` into a centralized location in the provider since 
these are used across multiple operators now?
   
   Perhaps `_check_conn` and `_make_intermediate_dirs` can live in a 
`providers/sftp/utils/` directory? A pattern in multiple providers for classes 
like `SFTPOperation` is to store them in the corresponding hook.
   
   

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs

Review comment:
       Actually, maybe the move is to have both the `SFTPOperator` and 
`SFTPBatchOperator` both live in a single `providers/sftp/operators/sftp.py` 
file.

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,147 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH 
connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.

Review comment:
       Is this doc meant to describe both `SFTPOperator` and 
`SFTPBatchOperator` or just one of the operators? This sections reads like the 
doc is about `SFTPOperator` only but there are code snippets for 
`SFTPBatchOperator` as well further on.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, 
_make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a 
versa.

Review comment:
       ```suggestion
       SFTPBatchOperator for transferring files from remote host to local or 
vice a versa.
   ```
   Also, for users it might be nice to have more details about how this 
operator is different than `SFTPOperator` when reading the API documentation 
(which is generated from the docstring).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to