Repository: incubator-airflow Updated Branches: refs/heads/master 4e8fdb50a -> 4751abf8a
[AIRFLOW-2073] Make FileSensor fail when the file doesn't exist Previously it never returned False because os.walk never failed. It also wasn't clear the behaviour of this sensor should have when given a directory. I think the most useful behaviour would be to wait for any files to exist, rather than returning as soon as the directory itself exists. Closes #3017 from ashb/AIRFLOW-2073-file-sensor- never-failed Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4751abf8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4751abf8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4751abf8 Branch: refs/heads/master Commit: 4751abf8acad766cb576ecfe3a333d68cc693b8c Parents: 4e8fdb5 Author: Ash Berlin-Taylor <[email protected]> Authored: Thu Feb 8 21:50:57 2018 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Feb 8 21:51:02 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/sensors/file_sensor.py | 18 ++++++-- tests/contrib/sensors/test_file_sensor.py | 64 ++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4751abf8/airflow/contrib/sensors/file_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py index fdc1e67..3e30116 100644 --- a/airflow/contrib/sensors/file_sensor.py +++ b/airflow/contrib/sensors/file_sensor.py @@ -13,7 +13,8 @@ # limitations under the License. # -from os import walk +import os +import stat from airflow.contrib.hooks.fs_hook import FSHook from airflow.sensors.base_sensor_operator import BaseSensorOperator @@ -22,7 +23,10 @@ from airflow.utils.decorators import apply_defaults class FileSensor(BaseSensorOperator): """ - Waits for a file or folder to land in a filesystem + Waits for a file or folder to land in a filesystem. + + If the path given is a directory then this sensor will only return true if + any files exist inside it (either directly, or within a subdirectory) :param fs_conn_id: reference to the File (path) connection id @@ -50,7 +54,13 @@ class FileSensor(BaseSensorOperator): full_path = "/".join([basepath, self.filepath]) self.log.info('Poking for file {full_path}'.format(**locals())) try: - files = [f for f in walk(full_path)] + if stat.S_ISDIR(os.stat(full_path).st_mode): + for root, dirs, files in os.walk(full_path): + if len(files): + return True + else: + # full_path was a file directly + return True except OSError: return False - return True + return False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4751abf8/tests/contrib/sensors/test_file_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_file_sensor.py b/tests/contrib/sensors/test_file_sensor.py index 7bc559d..5efdd58 100644 --- a/tests/contrib/sensors/test_file_sensor.py +++ b/tests/contrib/sensors/test_file_sensor.py @@ -14,9 +14,12 @@ # import unittest +import shutil +import tempfile from airflow import configuration from airflow import models, DAG +from airflow.exceptions import AirflowSensorTimeout from airflow.contrib.sensors.file_sensor import FileSensor from airflow.settings import Session from airflow.utils.timezone import datetime @@ -53,14 +56,69 @@ class FileSensorTest(unittest.TestCase): self.dag = dag def test_simple(self): + with tempfile.NamedTemporaryFile() as tmp: + task = FileSensor( + task_id="test", + filepath=tmp.name[1:], + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + + def test_file_in_nonexistent_dir(self): + dir = tempfile.mkdtemp() + task = FileSensor( + task_id="test", + filepath=dir[1:] + "/file", + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + try: + with self.assertRaises(AirflowSensorTimeout): + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + finally: + shutil.rmtree(dir) + + def test_empty_dir(self): + dir = tempfile.mkdtemp() + task = FileSensor( + task_id="test", + filepath=dir[1:], + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + try: + with self.assertRaises(AirflowSensorTimeout): + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + finally: + shutil.rmtree(dir) + + def test_file_in_dir(self): + dir = tempfile.mkdtemp() task = FileSensor( task_id="test", - filepath="etc/hosts", + filepath=dir[1:], fs_conn_id='fs_default', - _hook=self.hook, dag=self.dag, + timeout=0, ) - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + task._hook = self.hook + try: + # `touch` the dir + open(dir + "/file", "a").close() + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + finally: + shutil.rmtree(dir) if __name__ == '__main__':
