Repository: incubator-airflow
Updated Branches:
  refs/heads/master 410736dbc -> 1c4cff056


[AIRFLOW-715] A more efficient HDFS Sensor:

A more efficient HDFS Sensor:

HDFS Sensor is now capable to trigger true based
on a file size, a directory status
(empty or not) a regex to match files in a
directory and also to discard copying files.

With the base HDFS Sensor, it was not possible to
watch a directory for files with a
unknown name.

HDFS Sensors is now extended with (contrib):

  - HdfsSensorRegex : for matching files wih a regex
(re)
  - HdfsSensorFolder : for matching with directory

HDFS Sensor has now to built in filters :

  - filter_for_filesize : to filter list result by
the filesize
  - filter_for_ignored_ext : to discard or not
copying files

Unittests added with a new FakeSnakebite client
and a FakeHdfsHook

A more efficient HDFS Sensor:

HDFS Sensor is now capable to trigger true based
on a file size, a directory status
(empty or not) a regex to match files in a
directory and also to discard copying files.

With the base HDFS Sensor, it was not possible to
watch a directory for files with a
unknown name.

HDFS Sensors is now extended with (contrib):

  - HdfsSensorRegex : for matching files wih a regex
(re)
  - HdfsSensorFolder : for matching with directory

HDFS Sensor has now to built in filters :

  - filter_for_filesize : to filter list result by
the filesize
  - filter_for_ignored_ext : to discard or not
copying files

Unittests added with a new FakeSnakebite client
and a FakeHdfsHook

A more efficient HDFS Sensor:

HDFS Sensor is now capable to trigger true based
on a file size, a directory status
(empty or not) a regex to match files in a
directory and also to discard copying files.

With the base HDFS Sensor, it was not possible to
watch a directory for files with a
unknown name.

HDFS Sensors is now extended with (contrib):

  - HdfsSensorRegex : for matching files wih a regex
(re)
  - HdfsSensorFolder : for matching with directory

HDFS Sensor has now to built in filters :

  - filter_for_filesize : to filter list result by
the filesize
  - filter_for_ignored_ext : to discard or not
copying files

Unittests added with a new FakeSnakebite client
and a FakeHdfsHook

Closes #1957 from vfoucault/feature/AIRFLOW-715


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1c4cff05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1c4cff05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1c4cff05

Branch: refs/heads/master
Commit: 1c4cff056488623cfd3a6ec411e680e3e5198b21
Parents: 410736d
Author: vfoucault <[email protected]>
Authored: Sat Dec 31 14:02:07 2016 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Sat Dec 31 14:02:07 2016 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/hdfs_sensors.py |  67 +++++++
 airflow/operators/sensors.py            |  65 ++++++-
 airflow/settings.py                     |   6 +
 tests/contrib/__init__.py               |   1 +
 tests/contrib/sensors/hdfs_sensors.py   | 251 +++++++++++++++++++++++++++
 tests/core.py                           |  73 ++++++++
 tests/operators/sensors.py              |  73 +++++++-
 7 files changed, 528 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py 
b/airflow/contrib/sensors/hdfs_sensors.py
new file mode 100644
index 0000000..a12e295
--- /dev/null
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.operators.sensors import HdfsSensor
+import logging
+
+
+class HdfsSensorRegex(HdfsSensor):
+    def __init__(
+            self,
+            regex,
+            *args, **kwargs):
+        super(HdfsSensorRegex, self).__init__(*args, **kwargs)
+        self.regex = regex
+
+    def poke(self, context):
+        """
+        poke matching files in a directory with self.regex
+        :return: Bool depending on the search criteria
+        """
+        sb = self.hook(self.hdfs_conn_id).get_conn()
+        logging.getLogger("snakebite").setLevel(logging.WARNING)
+        logging.info(
+            'Poking for {self.filepath} to be a directory with files matching 
{self.regex.pattern}'.format(**locals()))
+        result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
+                  f['file_type'] == 'f' and 
self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
+        result = self.filter_for_ignored_ext(result, self.ignored_ext, 
self.ignore_copying)
+        result = self.filter_for_filesize(result, self.file_size)
+        return bool(result)
+
+
+class HdfsSensorFolder(HdfsSensor):
+    def __init__(
+            self,
+            be_empty=False,
+            *args, **kwargs):
+        super(HdfsSensorFolder, self).__init__(*args, **kwargs)
+        self.be_empty = be_empty
+
+    def poke(self, context):
+        """
+        poke for a non empty directory
+        :return: Bool depending on the search criteria
+        """
+        sb = self.hook(self.hdfs_conn_id).get_conn()
+        logging.getLogger("snakebite").setLevel(logging.WARNING)
+        result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
+        result = self.filter_for_ignored_ext(result, self.ignored_ext, 
self.ignore_copying)
+        result = self.filter_for_filesize(result, self.file_size)
+        if self.be_empty:
+            logging.info('Poking for filepath {self.filepath} to a empty 
directory'.format(**locals()))
+            return len(result) == 1 and result[0]['path'] == self.filepath
+        else:
+            logging.info('Poking for filepath {self.filepath} to a non empty 
directory'.format(**locals()))
+            return bool(result) and result[0]['file_type'] == 'f'
+
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 8dd1b71..da01483 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -22,12 +22,15 @@ from datetime import datetime
 import logging
 from urllib.parse import urlparse
 from time import sleep
+import re
+import sys
 
 import airflow
 from airflow import hooks, settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, 
AirflowSkipException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.hdfs_hook import HDFSHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
 
@@ -371,29 +374,77 @@ class HdfsSensor(BaseSensorOperator):
     Waits for a file or folder to land in HDFS
     """
     template_fields = ('filepath',)
-    ui_color = '#4d9de0'
+    ui_color = settings.WEB_COLORS['LIGHTBLUE']
 
     @apply_defaults
     def __init__(
             self,
             filepath,
             hdfs_conn_id='hdfs_default',
+            ignored_ext=['_COPYING_'],
+            ignore_copying=True,
+            file_size=None,
+            hook=HDFSHook,
             *args, **kwargs):
         super(HdfsSensor, self).__init__(*args, **kwargs)
         self.filepath = filepath
         self.hdfs_conn_id = hdfs_conn_id
+        self.file_size = file_size
+        self.ignored_ext = ignored_ext
+        self.ignore_copying = ignore_copying
+        self.hook = hook
+
+    @staticmethod
+    def filter_for_filesize(result, size=None):
+        """
+        Will test the filepath result and test if its size is at least 
self.filesize
+        :param result: a list of dicts returned by Snakebite ls
+        :param size: the file size in MB a file should be at least to trigger 
True
+        :return: (bool) depending on the matching criteria
+        """
+        if size:
+            logging.debug('Filtering for file size >= %s in files: %s', size, 
map(lambda x: x['path'], result))
+            size *= settings.MEGABYTE
+            result = [x for x in result if x['length'] >= size]
+            logging.debug('HdfsSensor.poke: after size filter result is %s', 
result)
+        return result
+
+    @staticmethod
+    def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
+        """
+        Will filter if instructed to do so the result to remove matching 
criteria
+        :param result: (list) of dicts returned by Snakebite ls
+        :param ignored_ext: (list) of ignored extentions
+        :param ignore_copying: (bool) shall we ignore ?
+        :return:
+        """
+        if ignore_copying:
+            regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
+            ignored_extentions_regex = re.compile(regex_builder)
+            logging.debug('Filtering result for ignored extentions: %s in 
files %s', ignored_extentions_regex.pattern,
+                          map(lambda x: x['path'], result))
+            result = [x for x in result if not 
ignored_extentions_regex.match(x['path'])]
+            logging.debug('HdfsSensor.poke: after ext filter result is %s', 
result)
+        return result
 
     def poke(self, context):
-        import airflow.hooks.hdfs_hook
-        sb = airflow.hooks.hdfs_hook.HDFSHook(self.hdfs_conn_id).get_conn()
+        sb = self.hook(self.hdfs_conn_id).get_conn()
         logging.getLogger("snakebite").setLevel(logging.WARNING)
-        logging.info(
-            'Poking for file {self.filepath} '.format(**locals()))
+        logging.info('Poking for file {self.filepath} '.format(**locals()))
         try:
-            files = [f for f in sb.ls([self.filepath])]
+            # IMOO it's not right here, as there no raise of any kind.
+            # if the filepath is let's say '/data/mydirectory', it's correct 
but if it is '/data/mydirectory/*',
+            # it's not correct as the directory exists and sb does not raise 
any error
+            # here is a quick fix
+            result = [f for f in sb.ls([self.filepath], 
include_toplevel=False)]
+            logging.debug('HdfsSensor.poke: result is %s', result)
+            result = self.filter_for_ignored_ext(result, self.ignored_ext, 
self.ignore_copying)
+            result = self.filter_for_filesize(result, self.file_size)
+            return bool(result)
         except:
+            e = sys.exc_info()
+            logging.debug("Caught an exception !: %s", str(e))
             return False
-        return True
 
 
 class WebHdfsSensor(BaseSensorOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index e8da674..ce2ca92 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -147,3 +147,9 @@ except:
 
 configure_logging()
 configure_orm()
+
+# Const stuff
+
+KILOBYTE = 1024
+MEGABYTE = KILOBYTE * KILOBYTE
+WEB_COLORS = {'LIGHTBLUE': '#4d9de0'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/contrib/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py
index 180273b..ff6f9e2 100644
--- a/tests/contrib/__init__.py
+++ b/tests/contrib/__init__.py
@@ -14,3 +14,4 @@
 
 from __future__ import absolute_import
 from .operators import *
+from .sensors import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/hdfs_sensors.py 
b/tests/contrib/sensors/hdfs_sensors.py
new file mode 100644
index 0000000..cabe349
--- /dev/null
+++ b/tests/contrib/sensors/hdfs_sensors.py
@@ -0,0 +1,251 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+import unittest
+import re
+from datetime import timedelta
+from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, 
HdfsSensorRegex
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class HdfsSensorFolderTests(unittest.TestCase):
+    def setUp(self):
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No 
need to test anything here')
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+
+    def test_should_be_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
+                                filepath='/datadirectory/empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/not_empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_be_a_non_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
+                                filepath='/datadirectory/not_empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_non_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+
+class HdfsSensorRegexTests(unittest.TestCase):
+    def setUp(self):
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No 
need to test anything here')
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+
+    def test_should_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_not_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("^IDoNotExist")
+        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_and_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignore_copying=True,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=10,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_match_regex_but_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_but_copyingext(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("copying_file_\d+.txt")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 85e7fa1..c85be2d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1773,6 +1773,7 @@ class HttpOpSensorTest(unittest.TestCase):
             dag=self.dag)
         sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
+
 class FakeWebHDFSHook(object):
     def __init__(self, conn_id):
         self.conn_id = conn_id
@@ -1783,6 +1784,78 @@ class FakeWebHDFSHook(object):
     def check_for_path(self, hdfs_path):
         return hdfs_path
 
+
+class FakeSnakeBiteClientException(Exception):
+    pass
+
+
+class FakeSnakeBiteClient(object):
+
+    def __init__(self):
+        self.started = True
+
+    def ls(self, path, include_toplevel=False):
+        """
+        the fake snakebite client
+        :param path: the array of path to test
+        :param include_toplevel: to return the toplevel directory info
+        :return: a list for path for the matching queries
+        """
+        if path[0] == '/datadirectory/empty_directory' and not 
include_toplevel:
+            return []
+        elif path[0] == '/datadirectory/datafile':
+            return [{'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 0, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': '/datadirectory/datafile'}]
+        elif path[0] == '/datadirectory/empty_directory' and include_toplevel:
+            return [
+                {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 
'access_time': 0, 'block_replication': 0,
+                 'modification_time': 1481132141540, 'length': 0, 'blocksize': 
0, 'owner': u'hdfs',
+                 'path': '/datadirectory/empty_directory'}]
+        elif path[0] == '/datadirectory/not_empty_directory':
+            return [{'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 0, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/not_empty_directory/test_file'}]
+        elif path[0] == '/datadirectory/not_empty_directory' and 
include_toplevel:
+            return [
+                {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 
'access_time': 0, 'block_replication': 0,
+                 'modification_time': 1481132141540, 'length': 0, 'blocksize': 
0, 'owner': u'hdfs',
+                 'path': '/datadirectory/empty_directory'},
+                {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 
'access_time': 1481122343796,
+                 'block_replication': 3, 'modification_time': 1481122343862, 
'length': 0, 'blocksize': 134217728,
+                 'owner': u'hdfs', 'path': 
'/datadirectory/not_empty_directory/test_file'}]
+        elif path[0] == '/datadirectory/not_existing_file_or_directory':
+            raise FakeSnakeBiteClientException
+        elif path[0] == '/datadirectory/regex_dir':
+            return [{'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 12582912, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/regex_dir/test1file'},
+                    {'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 12582912, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/regex_dir/test2file'},
+                    {'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 12582912, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/regex_dir/test3file'},
+                    {'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 12582912, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/regex_dir/copying_file_1.txt._COPYING_'},
+                    {'group': u'supergroup', 'permission': 420, 'file_type': 
'f', 'access_time': 1481122343796,
+                     'block_replication': 3, 'modification_time': 
1481122343862, 'length': 12582912, 'blocksize': 134217728,
+                     'owner': u'hdfs', 'path': 
'/datadirectory/regex_dir/copying_file_3.txt.sftp'}
+                    ]
+        else:
+            raise FakeSnakeBiteClientException
+
+
+class FakeHDFSHook(object):
+    def __init__(self, conn_id=None):
+        self.conn_id = conn_id
+
+    def get_conn(self):
+        client = FakeSnakeBiteClient()
+        return client
+
+
 class ConnectionTest(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1c4cff05/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index ebce1dc..e8b272b 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -15,13 +15,14 @@
 
 import logging
 import os
+import sys
 import time
 import unittest
 
 from datetime import datetime, timedelta
 
 from airflow import DAG, configuration
-from airflow.operators.sensors import HttpSensor, BaseSensorOperator
+from airflow.operators.sensors import HttpSensor, BaseSensorOperator, 
HdfsSensor
 from airflow.utils.decorators import apply_defaults
 from airflow.exceptions import (AirflowException,
                                 AirflowSensorTimeout,
@@ -110,3 +111,73 @@ class HttpSensorTests(unittest.TestCase):
             poke_interval=5)
         with self.assertRaisesRegexp(AirflowException, 'AirflowException 
raised here!'):
             task.execute(None)
+
+
+class HdfsSensorTests(unittest.TestCase):
+
+    def setUp(self):
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No 
need to test anything here')
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+
+    def test_legacy_file_exist(self):
+        """
+        Test the legacy behaviour
+        :return:
+        """
+        # Given
+        self.logger.info("Test for existing file with the legacy behaviour")
+        # When
+        task = HdfsSensor(task_id='Should_be_file_legacy',
+                          filepath='/datadirectory/datafile',
+                          timeout=1,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_legacy_file_exist_but_filesize(self):
+        """
+        Test the legacy behaviour with the filesize
+        :return:
+        """
+        # Given
+        self.logger.info("Test for existing file with the legacy behaviour")
+        # When
+        task = HdfsSensor(task_id='Should_be_file_legacy',
+                          filepath='/datadirectory/datafile',
+                          timeout=1,
+                          file_size=20,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_legacy_file_does_not_exists(self):
+        """
+        Test the legacy behaviour
+        :return:
+        """
+        # Given
+        self.logger.info("Test for non existing file with the legacy 
behaviour")
+        task = HdfsSensor(task_id='Should_not_be_file_legacy',
+                          
filepath='/datadirectory/not_existing_file_or_directory',
+                          timeout=1,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
\ No newline at end of file

Reply via email to