Repository: incubator-airflow
Updated Branches:
  refs/heads/master d9bf1edd4 -> bc6feea03


[AIRFLOW-1509][AIRFLOW-442] SFTP Sensor

Closes #3213 from sdiazb/sftp_sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bc6feea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bc6feea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bc6feea0

Branch: refs/heads/master
Commit: bc6feea0313325346393675792c18b531b526dca
Parents: d9bf1ed
Author: Sergio Díaz Bautista <s.diazbauti...@gmail.com>
Authored: Thu Apr 12 09:15:10 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Thu Apr 12 09:15:10 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/sensors/sftp_sensor.py    | 47 +++++++++++++++++++
 docs/code.rst                             |  1 +
 tests/contrib/sensors/test_sftp_sensor.py | 64 ++++++++++++++++++++++++++
 3 files changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/airflow/contrib/sensors/sftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/sftp_sensor.py 
b/airflow/contrib/sensors/sftp_sensor.py
new file mode 100644
index 0000000..6d6fbdd
--- /dev/null
+++ b/airflow/contrib/sensors/sftp_sensor.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from paramiko import SFTP_NO_SUCH_FILE
+from airflow.contrib.hooks.sftp_hook import SFTPHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SFTPSensor(BaseSensorOperator):
+    """
+    Waits for a file or directory to be present on SFTP.
+    :param path: Remote file or directory path
+    :type path: str
+    :param sftp_conn_id: The connection to run the sensor against
+    :type sftp_conn_id: str
+    """
+    template_fields = ('path',)
+
+    @apply_defaults
+    def __init__(self, path, sftp_conn_id='sftp_default', *args, **kwargs):
+        super(SFTPSensor, self).__init__(*args, **kwargs)
+        self.path = path
+        self.hook = SFTPHook(sftp_conn_id=sftp_conn_id)
+
+    def poke(self, context):
+        logging.info('Poking for %s', self.path)
+        try:
+            self.hook.get_mod_time(self.path)
+        except IOError as e:
+            if e.errno != SFTP_NO_SUCH_FILE:
+                raise e
+            return False
+        self.hook.close_conn()
+        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index a30d117..97327b1 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -203,6 +203,7 @@ Sensors
 .. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
 .. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleSensor
 .. autoclass:: airflow.contrib.sensors.redis_key_sensor.RedisKeySensor
+.. autoclass:: airflow.contrib.sensors.sftp_sensor.SFTPSensor
 .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
 
 .. _macros:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/tests/contrib/sensors/test_sftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_sftp_sensor.py 
b/tests/contrib/sensors/test_sftp_sensor.py
new file mode 100644
index 0000000..091b95f
--- /dev/null
+++ b/tests/contrib/sensors/test_sftp_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+from paramiko import SFTP_NO_SUCH_FILE, SFTP_FAILURE
+from airflow.contrib.sensors.sftp_sensor import SFTPSensor
+
+
+class SFTPSensorTest(unittest.TestCase):
+    @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook')
+    def test_file_present(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.get_mod_time.return_value = 
'19700101000000'
+        sftp_sensor = SFTPSensor(
+            task_id='unit_test',
+            path='/path/to/file/1970-01-01.txt')
+        context = {
+            'ds': '1970-01-01'
+        }
+        output = sftp_sensor.poke(context)
+        sftp_hook_mock.return_value.get_mod_time.assert_called_with(
+            '/path/to/file/1970-01-01.txt')
+        self.assertTrue(output)
+
+    @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook')
+    def test_file_absent(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.get_mod_time.side_effect = IOError(
+            SFTP_NO_SUCH_FILE, 'File missing')
+        sftp_sensor = SFTPSensor(
+            task_id='unit_test',
+            path='/path/to/file/1970-01-01.txt')
+        context = {
+            'ds': '1970-01-01'
+        }
+        output = sftp_sensor.poke(context)
+        sftp_hook_mock.return_value.get_mod_time.assert_called_with(
+            '/path/to/file/1970-01-01.txt')
+        self.assertFalse(output)
+
+    @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook')
+    def test_sftp_failure(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.get_mod_time.side_effect = IOError(
+            SFTP_FAILURE, 'SFTP failure')
+        sftp_sensor = SFTPSensor(
+            task_id='unit_test',
+            path='/path/to/file/1970-01-01.txt')
+        context = {
+            'ds': '1970-01-01'
+        }
+        with self.assertRaises(IOError):
+            sftp_sensor.poke(context)
+            sftp_hook_mock.return_value.get_mod_time.assert_called_with(
+                '/path/to/file/1970-01-01.txt')

Reply via email to