This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 171225e6dec6827d806b3208ffa0d9fb0941b7a0 Author: Eamon Ford <[email protected]> AuthorDate: Fri Oct 2 13:33:46 2020 -0700 Move directory scanning out of Collection class --- .../collection_manager/entities/Collection.py | 3 --- collection_manager/collection_manager/main.py | 1 - .../collection_manager/services/CollectionProcessor.py | 18 ++++++------------ .../collection_manager/services/CollectionWatcher.py | 8 +++++--- .../services/history_manager/IngestionHistory.py | 17 +++++++++-------- 5 files changed, 20 insertions(+), 27 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index 031a3a9..0feba0e 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -53,6 +53,3 @@ class Collection: return os.path.dirname(file_path) == self.path else: return fnmatch(file_path, self.path) - - def files_owned(self) -> List[str]: - return glob(self.path, recursive=True) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index cbe22f9..3de4fdd 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -70,7 +70,6 @@ async def main(): collection_processor = CollectionProcessor(message_publisher=publisher, history_manager_builder=history_manager_builder) collection_watcher = CollectionWatcher(collections_path=options.collections_path, - collection_updated_callback=collection_processor.process_collection, granule_updated_callback=collection_processor.process_granule, collections_refresh_interval=int(options.refresh)) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 975f50c..ab8ce95 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -1,12 +1,15 @@ import logging import os.path +from glob import glob from typing import Dict -import yaml +import yaml from collection_manager.entities import Collection from collection_manager.services import MessagePublisher -from collection_manager.services.history_manager import IngestionHistory, GranuleStatus -from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder +from collection_manager.services.history_manager import (GranuleStatus, + IngestionHistory) +from collection_manager.services.history_manager.IngestionHistory import \ + IngestionHistoryBuilder logger = logging.getLogger(__name__) @@ -20,15 +23,6 @@ class CollectionProcessor: self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} - async def process_collection(self, collection: Collection): - """ - Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each. - :param collection: A Collection definition - :return: None - """ - for granule in collection.files_owned(): - await self.process_granule(granule, collection) - async def process_granule(self, granule: str, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1c7c1be..20ec7c7 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -3,6 +3,7 @@ import logging import os import time from collections import defaultdict +from glob import glob from typing import Awaitable, Callable, Dict, Optional, Set import yaml @@ -21,14 +22,12 @@ logger.setLevel(logging.DEBUG) class CollectionWatcher: def __init__(self, collections_path: str, - collection_updated_callback: Callable[[Collection], Awaitable], granule_updated_callback: Callable[[str, Collection], Awaitable], collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path - self._collection_updated_callback = collection_updated_callback self._granule_updated_callback = granule_updated_callback self._collections_refresh_interval = collections_refresh_interval @@ -107,7 +106,10 @@ class CollectionWatcher: logger.info(f"Scanning files for {len(updated_collections)} collections...") start = time.perf_counter() for collection in updated_collections: - await self._collection_updated_callback(collection) + files_owned = glob(collection.path, recursive=True) + for granule in files_owned: + await self._granule_updated_callback(granule, collection) + logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") self._unschedule_watches() diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index b71c32f..231d179 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -48,10 +48,11 @@ class IngestionHistory(ABC): signature = self._signature_fun(file_path) await self._push_record(file_name, signature) + file_modified_date = os.path.getmtime(file_path) if not self._latest_ingested_file_update: - self._latest_ingested_file_update = os.path.getmtime(file_path) + self._latest_ingested_file_update = file_modified_date else: - self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path)) + self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date) await self._save_latest_timestamp() @@ -73,9 +74,10 @@ class IngestionHistory(ABC): should fall in order to be "desired". :return: A GranuleStatus enum. """ - if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()): + file_modified_date = os.path.getmtime(file_path) + if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path): + elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED @@ -114,15 +116,14 @@ class IngestionHistory(ABC): pass @staticmethod - def _in_time_range(file, date_from: datetime = None, date_to: datetime = None): + def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None): """ :param file: file path as a string :param date_from: timestamp, can be None :param date_to: timestamp, can be None :return: True is the update time of the file is between ts_from and ts_to. False otherwise """ - file_modified_time = os.path.getmtime(file) - is_after_from = date_from.timestamp() < file_modified_time if date_from else True - is_before_to = date_to.timestamp() > file_modified_time if date_to else True + is_after_from = start_date.timestamp() < date if start_date else True + is_before_to = end_date.timestamp() > date if end_date else True return is_after_from and is_before_to
