This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 1a15971e946950e86769fd3763ced15a2e5bb00d Author: Eamon Ford <[email protected]> AuthorDate: Thu Oct 29 15:33:56 2020 -0700 fix signature_fun for s3 --- collection_manager/collection_manager/main.py | 21 +++++++++++---- .../history_manager/FileIngestionHistory.py | 3 +-- .../services/history_manager/IngestionHistory.py | 30 +++++++++++++--------- .../history_manager/SolrIngestionHistory.py | 7 ++--- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 044cb87..3dba6e0 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -3,8 +3,11 @@ import asyncio import logging import os -from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher -from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder +from collection_manager.services import (CollectionProcessor, + CollectionWatcher, MessagePublisher) +from collection_manager.services.history_manager import ( + FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder, + md5sum_from_filepath) logging.basicConfig(level=logging.INFO) logging.getLogger("pika").setLevel(logging.WARNING) @@ -58,11 +61,19 @@ def get_args() -> argparse.Namespace: async def main(): try: options = get_args() + ENABLE_S3 = False + + if ENABLE_S3: + signature_fun = None + else: + signature_fun = md5sum_from_filepath if options.history_path: - history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) + history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path, + signature_fun=signature_fun) else: - history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) + history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url, + signature_fun=signature_fun) async with MessagePublisher(host=options.rabbitmq_host, username=options.rabbitmq_username, password=options.rabbitmq_password, @@ -72,7 +83,7 @@ async def main(): collection_watcher = CollectionWatcher(collections_path=options.collections_path, granule_updated_callback=collection_processor.process_granule, collections_refresh_interval=int(options.refresh), - s3=True) + s3=ENABLE_S3) await collection_watcher.start_watching() while True: diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index ffa065f..cf92997 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -4,7 +4,6 @@ from pathlib import Path from collection_manager.services.history_manager.IngestionHistory import IngestionHistory from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder -from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory): """ self._dataset_id = dataset_id self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv') - self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun + self._signature_fun = signature_fun self._history_dict = {} self._load_history_dict() diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index ea50ffb..d901690 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -38,28 +38,29 @@ class GranuleStatus(Enum): class IngestionHistory(ABC): _signature_fun = None - _latest_ingested_file_update = None + _latest_ingested_file_update: float = None - async def push(self, file_path: str, modified_time: datetime): + async def push(self, file_path: str, modified_datetime: datetime): """ Record a file as having been ingested. :param file_path: The full path to the file to record. :return: None """ + modified_timestamp = int(modified_datetime.timestamp()) file_name = IngestionHistory._get_standardized_path(file_path) - signature = self._signature_fun(file_path) + signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp) await self._push_record(file_name, signature) if not self._latest_ingested_file_update: - self._latest_ingested_file_update = modified_time + self._latest_ingested_file_update = modified_timestamp else: - self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time) + self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_timestamp) await self._save_latest_timestamp() async def get_granule_status(self, file_path: str, - modified_time: datetime, + modified_datetime: datetime, date_from: datetime = None, date_to: datetime = None) -> GranuleStatus: """ @@ -76,9 +77,11 @@ class IngestionHistory(ABC): should fall in order to be "desired". :return: A GranuleStatus enum. """ - if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()): + signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp()) + + if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path): + elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path, signature): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED @@ -102,14 +105,13 @@ class IngestionHistory(ABC): else: return None - async def _already_ingested(self, file_path: str) -> bool: + async def _already_ingested(self, file_path: str, signature) -> bool: """ Return a boolean indicating whether the specified file has already been ingested, based on its signature. :param file_path: The full path of a file to search for in the history. :return: A boolean indicating whether this file has already been ingested or not """ file_name = IngestionHistory._get_standardized_path(file_path) - signature = self._signature_fun(file_path) return signature == await self._get_signature(file_name) @abstractmethod @@ -132,7 +134,11 @@ class IngestionHistory(ABC): :param date_to: timestamp, can be None :return: True is the update time of the file is between ts_from and ts_to. False otherwise """ - is_after_from = start_date.timestamp() < date if start_date else True - is_before_to = end_date.timestamp() > date if end_date else True + is_after_from = start_date.timestamp() < date.timestamp() if start_date else True + is_before_to = end_date.timestamp() > date.timestamp() if end_date else True return is_after_from and is_before_to + + @staticmethod + def _signature_from_timestamp(timestamp: float): + return str(int(timestamp)) diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 59f5cd7..ebed073 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -3,11 +3,8 @@ import logging import pysolr import requests - +from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder) from common.async_utils.AsyncUtils import run_in_executor -from collection_manager.services.history_manager.IngestionHistory import IngestionHistory -from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder -from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath logging.getLogger("pysolr").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}") self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}") self._dataset_id = dataset_id - self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun + self._signature_fun = signature_fun self._latest_ingested_file_update = self._get_latest_file_update() except requests.exceptions.RequestException: raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}")
