Repository: incubator-airflow
Updated Branches:
  refs/heads/master 398746d8f -> f80138486


[AIRFLOW-442] Add SFTPHook

Closes #2487 from sdiazb/sftp_hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f8013848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f8013848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f8013848

Branch: refs/heads/master
Commit: f80138486e7828ff64fa544b4032a445b2ca140a
Parents: 398746d
Author: biellls <bielllob...@gmail.com>
Authored: Sat Mar 10 15:12:07 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Sat Mar 10 15:12:07 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py     |   1 +
 airflow/contrib/hooks/ftp_hook.py     |   1 +
 airflow/contrib/hooks/sftp_hook.py    | 164 +++++++++++++++++++++++++++++
 airflow/utils/db.py                   |   7 ++
 docs/code.rst                         |   1 +
 setup.py                              |   3 +-
 tests/contrib/hooks/test_sftp_hook.py | 109 +++++++++++++++++++
 7 files changed, 285 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py 
b/airflow/contrib/hooks/__init__.py
index 83e9f1f..3e7cfb1 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -37,6 +37,7 @@ _hooks = {
     'ftps_hook': ['FTPSHook'],
     'vertica_hook': ['VerticaHook'],
     'ssh_hook': ['SSHHook'],
+    'sftp_hook': ['SFTPHook'],
     'bigquery_hook': ['BigQueryHook'],
     'qubole_hook': ['QuboleHook'],
     'gcs_hook': ['GoogleCloudStorageHook'],

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py 
b/airflow/contrib/hooks/ftp_hook.py
index bd46ba5..84ed60c 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -91,6 +91,7 @@ class FTPHook(BaseHook, LoggingMixin):
         """
         conn = self.conn
         conn.quit()
+        self.conn = None
 
     def describe_directory(self, path):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/airflow/contrib/hooks/sftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sftp_hook.py 
b/airflow/contrib/hooks/sftp_hook.py
new file mode 100644
index 0000000..adff84e
--- /dev/null
+++ b/airflow/contrib/hooks/sftp_hook.py
@@ -0,0 +1,164 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import stat
+import pysftp
+import logging
+import datetime
+from airflow.hooks.base_hook import BaseHook
+
+
+class SFTPHook(BaseHook):
+    """
+    Interact with SFTP. Aims to be interchangeable with FTPHook.
+
+    Pitfalls: - In contrast with FTPHook describe_directory only returns size, 
type and
+                modify. It doesn't return unix.owner, unix.mode, perm, 
unix.group and
+                unique.
+              - retrieve_file and store_file only take a local full path and 
not a
+                buffer.
+              - If no mode is passed to create_directory it will be created 
with 777
+                permissions.
+
+    Errors that may occur throughout but should be handled downstream.
+    """
+
+    def __init__(self, ftp_conn_id='sftp_default'):
+        self.ftp_conn_id = ftp_conn_id
+        self.conn = None
+
+    def get_conn(self):
+        """
+        Returns an SFTP connection object
+        """
+        if self.conn is None:
+            params = self.get_connection(self.ftp_conn_id)
+            cnopts = pysftp.CnOpts()
+            if ('ignore_hostkey_verification' in params.extra_dejson and
+                    params.extra_dejson['ignore_hostkey_verification']):
+                cnopts.hostkeys = None
+            conn_params = {
+                'host': params.host,
+                'port': params.port,
+                'username': params.login,
+                'cnopts': cnopts
+            }
+            if params.password is not None:
+                conn_params['password'] = params.password
+            if 'private_key' in params.extra_dejson:
+                conn_params['private_key'] = params.extra_dejson['private_key']
+            if 'private_key_pass' in params.extra_dejson:
+                conn_params['private_key_pass'] = 
params.extra_dejson['private_key_pass']
+            self.conn = pysftp.Connection(**conn_params)
+        return self.conn
+
+    def close_conn(self):
+        """
+        Closes the connection. An error will occur if the
+        connection wasnt ever opened.
+        """
+        conn = self.conn
+        conn.close()
+        self.conn = None
+
+    def describe_directory(self, path):
+        """
+        Returns a dictionary of {filename: {attributes}} for all files
+        on the remote system (where the MLSD command is supported).
+        :param path: full path to the remote directory
+        :type path: str
+        """
+        conn = self.get_conn()
+        flist = conn.listdir_attr(path)
+        files = {}
+        for f in flist:
+            modify = datetime.datetime.fromtimestamp(
+                f.st_mtime).strftime('%Y%m%d%H%M%S')
+            files[f.filename] = {
+                'size': f.st_size,
+                'type': 'dir' if stat.S_ISDIR(f.st_mode) else 'file',
+                'modify': modify}
+        return files
+
+    def list_directory(self, path):
+        """
+        Returns a list of files on the remote system.
+        :param path: full path to the remote directory to list
+        :type path: str
+        """
+        conn = self.get_conn()
+        files = conn.listdir(path)
+        return files
+
+    def create_directory(self, path, mode=777):
+        """
+        Creates a directory on the remote system.
+        :param path: full path to the remote directory to create
+        :type path: str
+        :param mode: int representation of octal mode for directory
+        """
+        conn = self.get_conn()
+        conn.mkdir(path, mode)
+
+    def delete_directory(self, path):
+        """
+        Deletes a directory on the remote system.
+        :param path: full path to the remote directory to delete
+        :type path: str
+        """
+        conn = self.get_conn()
+        conn.rmdir(path)
+
+    def retrieve_file(self, remote_full_path, local_full_path):
+        """
+        Transfers the remote file to a local location.
+        If local_full_path is a string path, the file will be put
+        at that location
+        :param remote_full_path: full path to the remote file
+        :type remote_full_path: str
+        :param local_full_path: full path to the local file
+        :type local_full_path: str
+        """
+        conn = self.get_conn()
+        logging.info('Retrieving file from FTP: {}'.format(remote_full_path))
+        conn.get(remote_full_path, local_full_path)
+        logging.info('Finished retrieving file from FTP: {}'.format(
+            remote_full_path))
+
+    def store_file(self, remote_full_path, local_full_path):
+        """
+        Transfers a local file to the remote location.
+        If local_full_path_or_buffer is a string path, the file will be read
+        from that location
+        :param remote_full_path: full path to the remote file
+        :type remote_full_path: str
+        :param local_full_path: full path to the local file
+        :type local_full_path: str
+        """
+        conn = self.get_conn()
+        conn.put(local_full_path, remote_full_path)
+
+    def delete_file(self, path):
+        """
+        Removes a file on the FTP Server
+        :param path: full path to the remote file
+        :type path: str
+        """
+        conn = self.get_conn()
+        conn.remove(path)
+
+    def get_mod_time(self, path):
+        conn = self.get_conn()
+        ftp_mdtm = conn.stat(path).st_mtime
+        return 
datetime.datetime.fromtimestamp(ftp_mdtm).strftime('%Y%m%d%H%M%S')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2a38424..6acab4f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -174,6 +174,13 @@ def initdb():
             host='localhost'))
     merge_conn(
         models.Connection(
+            conn_id='sftp_default', conn_type='sftp',
+            host='localhost', port=22, login='travis',
+            extra='''
+                {"private_key": "~/.ssh/id_rsa", 
"ignore_hostkey_verification": true}
+            '''))
+    merge_conn(
+        models.Connection(
             conn_id='fs_default', conn_type='fs',
             extra='{"path": "/"}'))
     merge_conn(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 5c1b000..738e023 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -327,6 +327,7 @@ Community contributed hooks
 .. autoclass:: airflow.contrib.hooks.vertica_hook.VerticaHook
 .. autoclass:: airflow.contrib.hooks.ftp_hook.FTPHook
 .. autoclass:: airflow.contrib.hooks.ssh_hook.SSHHook
+.. autoclass:: airflow.contrib.hooks.sftp_hook.SFTPHook
 .. autoclass:: airflow.contrib.hooks.cloudant_hook.CloudantHook
 .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
 .. autoclass:: airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 83e88cb..20742d4 100644
--- a/setup.py
+++ b/setup.py
@@ -143,7 +143,7 @@ mysql = ['mysqlclient>=1.3.6']
 rabbitmq = ['librabbitmq>=1.6.1']
 oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2-binary>=2.7.4']
-ssh = ['paramiko>=2.1.1']
+ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
 salesforce = ['simple-salesforce>=0.72']
 s3 = ['boto3>=1.0.0']
 samba = ['pysmbclient>=0.1.3']
@@ -183,6 +183,7 @@ devel = [
     'qds-sdk>=1.9.6',
     'rednose',
     'paramiko',
+    'pysftp',
     'requests_mock'
 ]
 devel_minreq = devel + kubernetes + mysql + doc + password + s3 + cgroups

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f8013848/tests/contrib/hooks/test_sftp_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sftp_hook.py 
b/tests/contrib/hooks/test_sftp_hook.py
new file mode 100644
index 0000000..00aaaee
--- /dev/null
+++ b/tests/contrib/hooks/test_sftp_hook.py
@@ -0,0 +1,109 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import unittest
+import shutil
+import os
+import pysftp
+
+from airflow import configuration
+from airflow.contrib.hooks.sftp_hook import SFTPHook
+
+TMP_PATH = '/tmp'
+TMP_DIR_FOR_TESTS = 'tests_sftp_hook_dir'
+TMP_FILE_FOR_TESTS = 'test_file.txt'
+
+
+class SFTPHookTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        self.hook = SFTPHook()
+        os.makedirs(os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        with open(os.path.join(TMP_PATH, TMP_FILE_FOR_TESTS), 'a') as f:
+            f.write('Test file')
+
+    def test_get_conn(self):
+        output = self.hook.get_conn()
+        self.assertEqual(type(output), pysftp.Connection)
+
+    def test_close_conn(self):
+        self.hook.conn = self.hook.get_conn()
+        self.assertTrue(self.hook.conn is not None)
+        self.hook.close_conn()
+        self.assertTrue(self.hook.conn is None)
+
+    def test_describe_directory(self):
+        output = self.hook.describe_directory(TMP_PATH)
+        self.assertTrue(TMP_DIR_FOR_TESTS in output)
+
+    def test_list_directory(self):
+        output = self.hook.list_directory(
+            path=os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        self.assertEqual(output, [])
+
+    def test_create_and_delete_directory(self):
+        new_dir_name = 'new_dir'
+        self.hook.create_directory(os.path.join(
+            TMP_PATH, TMP_DIR_FOR_TESTS, new_dir_name))
+        output = self.hook.describe_directory(
+            os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        self.assertTrue(new_dir_name in output)
+        self.hook.delete_directory(os.path.join(
+            TMP_PATH, TMP_DIR_FOR_TESTS, new_dir_name))
+        output = self.hook.describe_directory(
+            os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        self.assertTrue(new_dir_name not in output)
+
+    def test_store_retrieve_and_delete_file(self):
+        self.hook.store_file(
+            remote_full_path=os.path.join(
+                TMP_PATH, TMP_DIR_FOR_TESTS, TMP_FILE_FOR_TESTS),
+            local_full_path=os.path.join(TMP_PATH, TMP_FILE_FOR_TESTS)
+        )
+        output = self.hook.list_directory(
+            path=os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        self.assertEqual(output, [TMP_FILE_FOR_TESTS])
+        retrieved_file_name = 'retrieved.txt'
+        self.hook.retrieve_file(
+            remote_full_path=os.path.join(
+                TMP_PATH, TMP_DIR_FOR_TESTS, TMP_FILE_FOR_TESTS),
+            local_full_path=os.path.join(TMP_PATH, retrieved_file_name)
+        )
+        self.assertTrue(retrieved_file_name in os.listdir(TMP_PATH))
+        os.remove(os.path.join(TMP_PATH, retrieved_file_name))
+        self.hook.delete_file(path=os.path.join(
+            TMP_PATH, TMP_DIR_FOR_TESTS, TMP_FILE_FOR_TESTS))
+        output = self.hook.list_directory(
+            path=os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        self.assertEqual(output, [])
+
+    def test_get_mod_time(self):
+        self.hook.store_file(
+            remote_full_path=os.path.join(
+                TMP_PATH, TMP_DIR_FOR_TESTS, TMP_FILE_FOR_TESTS),
+            local_full_path=os.path.join(TMP_PATH, TMP_FILE_FOR_TESTS)
+        )
+        output = self.hook.get_mod_time(path=os.path.join(
+            TMP_PATH, TMP_DIR_FOR_TESTS, TMP_FILE_FOR_TESTS))
+        self.assertEqual(len(output), 14)
+
+    def tearDown(self):
+        shutil.rmtree(os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS))
+        os.remove(os.path.join(TMP_PATH, TMP_FILE_FOR_TESTS))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to