Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop 
snakebite in favour of hdfs3
URL: https://github.com/apache/incubator-airflow/pull/3560#discussion_r209463485
 
 

 ##########
 File path: tests/contrib/sensors/test_hdfs_sensor.py
 ##########
 @@ -7,247 +7,199 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import logging
-import unittest
 
+import datetime as dt
 import re
-from datetime import timedelta
+import unittest
+import warnings
 
-from airflow.contrib.sensors.hdfs_sensor import HdfsSensorFolder, 
HdfsSensorRegex
-from airflow.exceptions import AirflowSensorTimeout
+from airflow import models
+from airflow.contrib.sensors.hdfs_sensor import (HdfsSensorFolder,
+                                                 HdfsSensorRegex,
+                                                 HdfsRegexFileSensor)
 
+from tests.sensors.test_hdfs_sensor import MockHdfs3Client
+
+
+class HdfsRegexFileSensorTests(unittest.TestCase):
+    """Tests for the HdfsRegexFileSensor class."""
 
-class HdfsSensorFolderTests(unittest.TestCase):
-    def setUp(self):
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.log = logging.getLogger()
-        self.log.setLevel(logging.DEBUG)
-
-    def test_should_be_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
-                                filepath='/datadirectory/empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/not_empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_be_a_non_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
-                                filepath='/datadirectory/not_empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_non_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-
-class HdfsSensorRegexTests(unittest.TestCase):
     def setUp(self):
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.log = logging.getLogger()
-        self.log.setLevel(logging.DEBUG)
+        file_details = [
+            {
+                'kind': 'directory',
+                'name': '/data',
+                'size': 0
+            },
+            {
+                'kind': 'file',
+                'name': '/data/test1file',
+                'size': 2000000
+            },
+            {
+                'kind': 'file',
+                'name': '/data/copying._COPYING_',
+                'size': 2000000
+            }
+        ]
+
+        self._mock_client, self._mock_params = \
+            MockHdfs3Client.from_file_details(file_details, test_instance=self)
+
+        self._default_task_kws = {
+            'timeout': 1,
+            'retry_delay': dt.timedelta(seconds=1),
+            'poke_interval': 1
+        }
 
     def test_should_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
+        """Tests example where files should match regex."""
+
+        regex = re.compile("test[1-2]file")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex',
+            pattern='/data/*',
+            regex=regex,
+            **self._default_task_kws)
+        self.assertTrue(task.poke(context={}))
+
+    def test_should_match_regex_dir(self):
+        """Tests example where files should match regex with dir path."""
+
+        regex = re.compile("test[1-2]file")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex',
+            pattern='/data',
+            regex=regex,
+            **self._default_task_kws)
+        self.assertTrue(task.poke(context={}))
 
     def test_should_not_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("^IDoNotExist")
-        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_and_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignore_copying=True,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=10,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_match_regex_but_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_but_copyingext(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("copying_file_\d+.txt")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
+        """Tests example where files should match regex."""
+
+        regex = re.compile("^IDoNotExist")
+        task = HdfsRegexFileSensor(
+            task_id='should_not_match_the_regex',
+            pattern='/data/*',
+            regex=regex,
+            **self._default_task_kws)
+        self.assertFalse(task.poke(context={}))
+
+    def test_should_match_regex_and_size(self):
+        """Tests example with matching regex and sufficient file size."""
+
+        regex = re.compile("test[1-2]file")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex_and_size',
+            pattern='/data/*',
+            regex=regex,
+            min_size=1,
+            **self._default_task_kws)
+        self.assertTrue(task.poke(context={}))
+
+    def test_should_match_regex_not_size(self):
+        """Tests example with matching regex but too small file size."""
+
+        regex = re.compile("test[1-2]file")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex_but_not_size',
+            pattern='/data/*',
+            regex=regex,
+            min_size=10,
+            **self._default_task_kws)
+        self.assertFalse(task.poke(context={}))
+
+    def test_should_match_regex_not_ext(self):
+        """Tests example with matching regex but wrong ext."""
+
+        regex = re.compile("test[1-2]file")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex_but_not_size',
+            pattern='/data/*',
+            regex=regex,
+            min_size=10,
+            **self._default_task_kws)
+        self.assertFalse(task.poke(context={}))
+
+        compiled_regex = re.compile("copying.*")
+        task = HdfsRegexFileSensor(
+            task_id='should_match_the_regex_but_not_ext',
+            pattern='/data/*',
+            regex=compiled_regex,
+            ignore_exts=['_COPYING_'],
+            **self._default_task_kws)
+        self.assertFalse(task.poke(context={}))
+
+    def test_calling_old_class(self):
+        """Tests call to old class."""
+
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore")
+
+            regex = re.compile("test[1-2]file")
+            task = HdfsSensorRegex(
+                task_id='should_match_the_regex_old',
+                pattern='/data',
+                regex=regex,
+                **self._default_task_kws)
+            self.assertTrue(task.poke(context={}))
+
+
+class HdfsSensorFolderTests(unittest.TestCase):
 
 Review comment:
   For the tests as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to