Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4e8fdb50a -> 4751abf8a


[AIRFLOW-2073] Make FileSensor fail when the file doesn't exist

Previously it never returned False because os.walk
never failed.

It also wasn't clear the behaviour of this sensor
should have when given
a directory. I think the most useful behaviour
would be to wait for any
files to exist, rather than returning as soon as
the directory itself
exists.

Closes #3017 from ashb/AIRFLOW-2073-file-sensor-
never-failed


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4751abf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4751abf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4751abf8

Branch: refs/heads/master
Commit: 4751abf8acad766cb576ecfe3a333d68cc693b8c
Parents: 4e8fdb5
Author: Ash Berlin-Taylor <[email protected]>
Authored: Thu Feb 8 21:50:57 2018 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Thu Feb 8 21:51:02 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/file_sensor.py    | 18 ++++++--
 tests/contrib/sensors/test_file_sensor.py | 64 ++++++++++++++++++++++++--
 2 files changed, 75 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4751abf8/airflow/contrib/sensors/file_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/file_sensor.py 
b/airflow/contrib/sensors/file_sensor.py
index fdc1e67..3e30116 100644
--- a/airflow/contrib/sensors/file_sensor.py
+++ b/airflow/contrib/sensors/file_sensor.py
@@ -13,7 +13,8 @@
 # limitations under the License.
 #
 
-from os import walk
+import os
+import stat
 
 from airflow.contrib.hooks.fs_hook import FSHook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
@@ -22,7 +23,10 @@ from airflow.utils.decorators import apply_defaults
 
 class FileSensor(BaseSensorOperator):
     """
-    Waits for a file or folder to land in a filesystem
+    Waits for a file or folder to land in a filesystem.
+
+    If the path given is a directory then this sensor will only return true if
+    any files exist inside it (either directly, or within a subdirectory)
 
     :param fs_conn_id: reference to the File (path)
         connection id
@@ -50,7 +54,13 @@ class FileSensor(BaseSensorOperator):
         full_path = "/".join([basepath, self.filepath])
         self.log.info('Poking for file {full_path}'.format(**locals()))
         try:
-            files = [f for f in walk(full_path)]
+            if stat.S_ISDIR(os.stat(full_path).st_mode):
+                for root, dirs, files in os.walk(full_path):
+                    if len(files):
+                        return True
+            else:
+                # full_path was a file directly
+                return True
         except OSError:
             return False
-        return True
+        return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4751abf8/tests/contrib/sensors/test_file_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_file_sensor.py 
b/tests/contrib/sensors/test_file_sensor.py
index 7bc559d..5efdd58 100644
--- a/tests/contrib/sensors/test_file_sensor.py
+++ b/tests/contrib/sensors/test_file_sensor.py
@@ -14,9 +14,12 @@
 #
 
 import unittest
+import shutil
+import tempfile
 
 from airflow import configuration
 from airflow import models, DAG
+from airflow.exceptions import AirflowSensorTimeout
 from airflow.contrib.sensors.file_sensor import FileSensor
 from airflow.settings import Session
 from airflow.utils.timezone import datetime
@@ -53,14 +56,69 @@ class FileSensorTest(unittest.TestCase):
         self.dag = dag
 
     def test_simple(self):
+        with tempfile.NamedTemporaryFile() as tmp:
+            task = FileSensor(
+                task_id="test",
+                filepath=tmp.name[1:],
+                fs_conn_id='fs_default',
+                dag=self.dag,
+                timeout=0,
+            )
+            task._hook = self.hook
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                     ignore_ti_state=True)
+
+    def test_file_in_nonexistent_dir(self):
+        dir = tempfile.mkdtemp()
+        task = FileSensor(
+            task_id="test",
+            filepath=dir[1:] + "/file",
+            fs_conn_id='fs_default',
+            dag=self.dag,
+            timeout=0,
+        )
+        task._hook = self.hook
+        try:
+            with self.assertRaises(AirflowSensorTimeout):
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                         ignore_ti_state=True)
+        finally:
+            shutil.rmtree(dir)
+
+    def test_empty_dir(self):
+        dir = tempfile.mkdtemp()
+        task = FileSensor(
+            task_id="test",
+            filepath=dir[1:],
+            fs_conn_id='fs_default',
+            dag=self.dag,
+            timeout=0,
+        )
+        task._hook = self.hook
+        try:
+            with self.assertRaises(AirflowSensorTimeout):
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                         ignore_ti_state=True)
+        finally:
+            shutil.rmtree(dir)
+
+    def test_file_in_dir(self):
+        dir = tempfile.mkdtemp()
         task = FileSensor(
             task_id="test",
-            filepath="etc/hosts",
+            filepath=dir[1:],
             fs_conn_id='fs_default',
-            _hook=self.hook,
             dag=self.dag,
+            timeout=0,
         )
-        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+        task._hook = self.hook
+        try:
+            # `touch` the dir
+            open(dir + "/file", "a").close()
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
+                     ignore_ti_state=True)
+        finally:
+            shutil.rmtree(dir)
 
 
 if __name__ == '__main__':

Reply via email to