This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit b87d30e16e88d06d65341dc225bf5ee47dcc7256 Author: Eamon Ford <[email protected]> AuthorDate: Thu Oct 29 16:51:02 2020 -0700 it works --- collection_manager/collection_manager/main.py | 2 +- .../services/CollectionProcessor.py | 7 ++++-- .../services/CollectionWatcher.py | 6 +++--- .../services/history_manager/IngestionHistory.py | 25 +++++++++------------- .../history_manager/SolrIngestionHistory.py | 2 +- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 3dba6e0..782c70e 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -61,7 +61,7 @@ def get_args() -> argparse.Namespace: async def main(): try: options = get_args() - ENABLE_S3 = False + ENABLE_S3 = True if ENABLE_S3: signature_fun = None diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 89d413b..96c461e 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -24,7 +24,7 @@ class CollectionProcessor: self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} - async def process_granule(self, granule: str, modified_time: datetime, collection: Collection): + async def process_granule(self, granule: str, modified_time: int, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. :param granule: A path to a granule file @@ -35,7 +35,10 @@ class CollectionProcessor: return history_manager = self._get_history_manager(collection.dataset_id) - granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to) + granule_status = await history_manager.get_granule_status(granule, + modified_time, + collection.date_from, + collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1fb6abd..abd4a11 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -121,7 +121,7 @@ class CollectionWatcher: start = time.perf_counter() for collection in collections: for granule_path in glob(collection.path, recursive=True): - modified_time = os.path.getmtime(granule_path) + modified_time = int(os.path.getmtime(granule_path)) await self._granule_updated_callback(granule_path, modified_time, collection) logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") @@ -207,9 +207,9 @@ class _GranuleEventHandler(FileSystemEventHandler): try: if collection.owns_file(path): if isinstance(event, S3Event): - modified_time = event.modified_time + modified_time = int(event.modified_time.timestamp()) else: - modified_time = datetime.fromtimestamp(os.path.getmtime(path)) + modified_time = int(os.path.getmtime(path)) self._loop.create_task(self._callback(path, modified_time, collection)) except IsADirectoryError: return diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index d901690..7f33c79 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -38,17 +38,16 @@ class GranuleStatus(Enum): class IngestionHistory(ABC): _signature_fun = None - _latest_ingested_file_update: float = None + _latest_ingested_file_update: int = None - async def push(self, file_path: str, modified_datetime: datetime): + async def push(self, file_path: str, modified_timestamp: int): """ Record a file as having been ingested. :param file_path: The full path to the file to record. :return: None """ - modified_timestamp = int(modified_datetime.timestamp()) file_name = IngestionHistory._get_standardized_path(file_path) - signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp) + signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp) await self._push_record(file_name, signature) if not self._latest_ingested_file_update: @@ -60,7 +59,7 @@ class IngestionHistory(ABC): async def get_granule_status(self, file_path: str, - modified_datetime: datetime, + modified_timestamp: int, date_from: datetime = None, date_to: datetime = None) -> GranuleStatus: """ @@ -77,11 +76,11 @@ class IngestionHistory(ABC): should fall in order to be "desired". :return: A GranuleStatus enum. """ - signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp()) + signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp) - if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()): + if self._in_time_range(modified_timestamp, start_date=self._latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path, signature): + elif self._in_time_range(modified_timestamp, date_from, date_to) and not await self._already_ingested(file_path, signature): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED @@ -127,18 +126,14 @@ class IngestionHistory(ABC): pass @staticmethod - def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None): + def _in_time_range(timestamp: int, start_date: datetime = None, end_date: datetime = None): """ :param file: file path as a string :param date_from: timestamp, can be None :param date_to: timestamp, can be None :return: True is the update time of the file is between ts_from and ts_to. False otherwise """ - is_after_from = start_date.timestamp() < date.timestamp() if start_date else True - is_before_to = end_date.timestamp() > date.timestamp() if end_date else True + is_after_from = int(start_date.timestamp()) < timestamp if start_date else True + is_before_to = int(end_date.timestamp()) > timestamp if end_date else True return is_after_from and is_before_to - - @staticmethod - def _signature_from_timestamp(timestamp: float): - return str(int(timestamp)) diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index ebed073..c6d26a5 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -64,7 +64,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_datasets.add([{ 'id': self._dataset_id, 'dataset_s': self._dataset_id, - 'latest_update_l': int(self._latest_ingested_file_update)}]) + 'latest_update_l': self._latest_ingested_file_update}]) self._solr_datasets.commit() def _get_latest_file_update(self):
