Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop 
snakebite in favour of hdfs3
URL: https://github.com/apache/incubator-airflow/pull/3560#discussion_r209463470
 
 

 ##########
 File path: airflow/sensors/hdfs_sensor.py
 ##########
 @@ -17,103 +17,231 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import re
-import sys
-from builtins import str
+import posixpath
 
 from airflow import settings
-from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.hooks.hdfs_hook import HdfsHook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-class HdfsSensor(BaseSensorOperator):
-    """
-    Waits for a file or folder to land in HDFS
+class HdfsFileSensor(BaseSensorOperator):
+    """Sensor that waits for files matching a specific (glob) pattern to land 
in HDFS.
+
+    :param str file_pattern: Glob pattern to match.
+    :param str conn_id: Connection to use.
+    :param Iterable[FilePathFilter] filters: Optional list of filters that can 
be
+        used to apply further filtering to any file paths matching the glob 
pattern.
+        Any files that fail a filter are dropped from consideration.
+    :param int min_size: Minimum size (in MB) for files to be considered. Can 
be used
+        to filter any intermediate files that are below the expected file size.
+    :param Set[str] ignore_exts: File extensions to ignore. By default, files 
with
+        a '_COPYING_' extension are ignored, as these represent temporary 
files.
     """
-    template_fields = ('filepath',)
-    ui_color = settings.WEB_COLORS['LIGHTBLUE']
+
+    template_fields = ("_pattern",)
+    ui_color = settings.WEB_COLORS["LIGHTBLUE"]
 
     @apply_defaults
-    def __init__(self,
-                 filepath,
-                 hdfs_conn_id='hdfs_default',
-                 ignored_ext=None,
-                 ignore_copying=True,
-                 file_size=None,
-                 hook=HDFSHook,
-                 *args,
-                 **kwargs):
-        super(HdfsSensor, self).__init__(*args, **kwargs)
-        if ignored_ext is None:
-            ignored_ext = ['_COPYING_']
-        self.filepath = filepath
-        self.hdfs_conn_id = hdfs_conn_id
-        self.file_size = file_size
-        self.ignored_ext = ignored_ext
-        self.ignore_copying = ignore_copying
-        self.hook = hook
-
-    @staticmethod
-    def filter_for_filesize(result, size=None):
-        """
-        Will test the filepath result and test if its size is at least 
self.filesize
-
-        :param result: a list of dicts returned by Snakebite ls
-        :param size: the file size in MB a file should be at least to trigger 
True
-        :return: (bool) depending on the matching criteria
-        """
-        if size:
-            log = LoggingMixin().log
-            log.debug(
-                'Filtering for file size >= %s in files: %s',
-                size, map(lambda x: x['path'], result)
-            )
-            size *= settings.MEGABYTE
-            result = [x for x in result if x['length'] >= size]
-            log.debug('HdfsSensor.poke: after size filter result is %s', 
result)
-        return result
-
-    @staticmethod
-    def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
-        """
-        Will filter if instructed to do so the result to remove matching 
criteria
-
-        :param result: (list) of dicts returned by Snakebite ls
-        :param ignored_ext: (list) of ignored extensions
-        :param ignore_copying: (bool) shall we ignore ?
-        :return: (list) of dicts which were not removed
-        """
-        if ignore_copying:
-            log = LoggingMixin().log
-            regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
-            ignored_extentions_regex = re.compile(regex_builder)
-            log.debug(
-                'Filtering result for ignored extensions: %s in files %s',
-                ignored_extentions_regex.pattern, map(lambda x: x['path'], 
result)
-            )
-            result = [x for x in result if not 
ignored_extentions_regex.match(x['path'])]
-            log.debug('HdfsSensor.poke: after ext filter result is %s', result)
-        return result
+    def __init__(
+        self,
+        pattern,
+        conn_id="hdfs_default",
+        filters=None,
+        min_size=None,
+        ignore_exts=("_COPYING_",),
+        **kwargs
+    ):
+        super(HdfsFileSensor, self).__init__(**kwargs)
+
+        # Min-size and ignore-ext filters are added via
+        # arguments for backwards compatibility.
+        filters = list(filters or [])
+
+        if min_size:
+            filters.append(SizeFilter(min_size=min_size))
+
+        if ignore_exts:
+            filters.append(ExtFilter(exts=ignore_exts))
+
+        self._pattern = pattern
+        self._conn_id = conn_id
+        self._filters = filters
 
     def poke(self, context):
-        sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.log.info('Poking for file {self.filepath}'.format(**locals()))
-        try:
-            # IMOO it's not right here, as there no raise of any kind.
-            # if the filepath is let's say '/data/mydirectory',
-            # it's correct but if it is '/data/mydirectory/*',
-            # it's not correct as the directory exists and sb does not raise 
any error
-            # here is a quick fix
-            result = [f for f in sb.ls([self.filepath], 
include_toplevel=False)]
-            self.log.debug('HdfsSensor.poke: result is %s', result)
-            result = self.filter_for_ignored_ext(
-                result, self.ignored_ext, self.ignore_copying
+        with HdfsHook(self._conn_id) as hook:
+            conn = hook.get_conn()
+
+            # Fetch files matching glob pattern.
+            self.log.info("Poking for file pattern %s", self._pattern)
+
+            try:
+                file_paths = [
+                    fp for fp in conn.glob(self._pattern) if not conn.isdir(fp)
+                ]
+            except IOError:
+                # File path doesn't exist yet.
+                file_paths = []
+
+            self.log.info("Files matching pattern: %s", file_paths)
+
+            # Filter using any provided filters.
+            for filter_ in self._filters:
+                file_paths = filter_(file_paths, hook)
+            file_paths = list(file_paths)
+
+            self.log.info("Filters after filtering: %s", file_paths)
+
+            return bool(file_paths)
+
+
+class HdfsFolderSensor(BaseSensorOperator):
 
 Review comment:
   Please move this sensor to a separate file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to