Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop snakebite in favour of hdfs3 URL: https://github.com/apache/incubator-airflow/pull/3560#discussion_r209463470
########## File path: airflow/sensors/hdfs_sensor.py ########## @@ -17,103 +17,231 @@ # specific language governing permissions and limitations # under the License. -import re -import sys -from builtins import str +import posixpath from airflow import settings -from airflow.hooks.hdfs_hook import HDFSHook +from airflow.hooks.hdfs_hook import HdfsHook from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults -from airflow.utils.log.logging_mixin import LoggingMixin -class HdfsSensor(BaseSensorOperator): - """ - Waits for a file or folder to land in HDFS +class HdfsFileSensor(BaseSensorOperator): + """Sensor that waits for files matching a specific (glob) pattern to land in HDFS. + + :param str file_pattern: Glob pattern to match. + :param str conn_id: Connection to use. + :param Iterable[FilePathFilter] filters: Optional list of filters that can be + used to apply further filtering to any file paths matching the glob pattern. + Any files that fail a filter are dropped from consideration. + :param int min_size: Minimum size (in MB) for files to be considered. Can be used + to filter any intermediate files that are below the expected file size. + :param Set[str] ignore_exts: File extensions to ignore. By default, files with + a '_COPYING_' extension are ignored, as these represent temporary files. """ - template_fields = ('filepath',) - ui_color = settings.WEB_COLORS['LIGHTBLUE'] + + template_fields = ("_pattern",) + ui_color = settings.WEB_COLORS["LIGHTBLUE"] @apply_defaults - def __init__(self, - filepath, - hdfs_conn_id='hdfs_default', - ignored_ext=None, - ignore_copying=True, - file_size=None, - hook=HDFSHook, - *args, - **kwargs): - super(HdfsSensor, self).__init__(*args, **kwargs) - if ignored_ext is None: - ignored_ext = ['_COPYING_'] - self.filepath = filepath - self.hdfs_conn_id = hdfs_conn_id - self.file_size = file_size - self.ignored_ext = ignored_ext - self.ignore_copying = ignore_copying - self.hook = hook - - @staticmethod - def filter_for_filesize(result, size=None): - """ - Will test the filepath result and test if its size is at least self.filesize - - :param result: a list of dicts returned by Snakebite ls - :param size: the file size in MB a file should be at least to trigger True - :return: (bool) depending on the matching criteria - """ - if size: - log = LoggingMixin().log - log.debug( - 'Filtering for file size >= %s in files: %s', - size, map(lambda x: x['path'], result) - ) - size *= settings.MEGABYTE - result = [x for x in result if x['length'] >= size] - log.debug('HdfsSensor.poke: after size filter result is %s', result) - return result - - @staticmethod - def filter_for_ignored_ext(result, ignored_ext, ignore_copying): - """ - Will filter if instructed to do so the result to remove matching criteria - - :param result: (list) of dicts returned by Snakebite ls - :param ignored_ext: (list) of ignored extensions - :param ignore_copying: (bool) shall we ignore ? - :return: (list) of dicts which were not removed - """ - if ignore_copying: - log = LoggingMixin().log - regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext) - ignored_extentions_regex = re.compile(regex_builder) - log.debug( - 'Filtering result for ignored extensions: %s in files %s', - ignored_extentions_regex.pattern, map(lambda x: x['path'], result) - ) - result = [x for x in result if not ignored_extentions_regex.match(x['path'])] - log.debug('HdfsSensor.poke: after ext filter result is %s', result) - return result + def __init__( + self, + pattern, + conn_id="hdfs_default", + filters=None, + min_size=None, + ignore_exts=("_COPYING_",), + **kwargs + ): + super(HdfsFileSensor, self).__init__(**kwargs) + + # Min-size and ignore-ext filters are added via + # arguments for backwards compatibility. + filters = list(filters or []) + + if min_size: + filters.append(SizeFilter(min_size=min_size)) + + if ignore_exts: + filters.append(ExtFilter(exts=ignore_exts)) + + self._pattern = pattern + self._conn_id = conn_id + self._filters = filters def poke(self, context): - sb = self.hook(self.hdfs_conn_id).get_conn() - self.log.info('Poking for file {self.filepath}'.format(**locals())) - try: - # IMOO it's not right here, as there no raise of any kind. - # if the filepath is let's say '/data/mydirectory', - # it's correct but if it is '/data/mydirectory/*', - # it's not correct as the directory exists and sb does not raise any error - # here is a quick fix - result = [f for f in sb.ls([self.filepath], include_toplevel=False)] - self.log.debug('HdfsSensor.poke: result is %s', result) - result = self.filter_for_ignored_ext( - result, self.ignored_ext, self.ignore_copying + with HdfsHook(self._conn_id) as hook: + conn = hook.get_conn() + + # Fetch files matching glob pattern. + self.log.info("Poking for file pattern %s", self._pattern) + + try: + file_paths = [ + fp for fp in conn.glob(self._pattern) if not conn.isdir(fp) + ] + except IOError: + # File path doesn't exist yet. + file_paths = [] + + self.log.info("Files matching pattern: %s", file_paths) + + # Filter using any provided filters. + for filter_ in self._filters: + file_paths = filter_(file_paths, hook) + file_paths = list(file_paths) + + self.log.info("Filters after filtering: %s", file_paths) + + return bool(file_paths) + + +class HdfsFolderSensor(BaseSensorOperator): Review comment: Please move this sensor to a separate file. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services