This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 31bff5e6c524c213ae33fe1af39f18f4da2969cc Author: Eamon Ford <[email protected]> AuthorDate: Thu Nov 12 14:59:34 2020 -0800 SDAP-288: S3 ingestion support (#24) --- .../collection_manager/entities/Collection.py | 33 +++-- .../collection_manager/entities/__init__.py | 1 + collection_manager/collection_manager/main.py | 22 +++- .../services/CollectionProcessor.py | 28 ++-- .../services/CollectionWatcher.py | 90 ++++++++----- .../collection_manager/services/S3Observer.py | 145 +++++++++++++++++++++ .../collection_manager/services/__init__.py | 1 + .../history_manager/FileIngestionHistory.py | 3 +- .../services/history_manager/IngestionHistory.py | 79 ++++++----- .../history_manager/SolrIngestionHistory.py | 9 +- collection_manager/docker/Dockerfile | 4 +- collection_manager/docker/entrypoint.sh | 5 +- collection_manager/requirements.txt | 6 +- collection_manager/setup.py | 2 +- .../history_manager/test_FileIngestionHistory.py | 6 +- .../tests/services/test_S3Observer.py | 8 ++ granule_ingester/conda-requirements.txt | 1 - granule_ingester/docker/Dockerfile | 2 + granule_ingester/requirements.txt | 5 +- 19 files changed, 334 insertions(+), 116 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index 031a3a9..7a45b66 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -1,13 +1,20 @@ import os +from urllib.parse import urlparse from dataclasses import dataclass from datetime import datetime from fnmatch import fnmatch from glob import glob from typing import List, Optional +from enum import Enum from collection_manager.entities.exceptions import MissingValueCollectionError +class CollectionStorageType(Enum): + LOCAL = 1 + S3 = 2 + + @dataclass(frozen=True) class Collection: dataset_id: str @@ -39,20 +46,28 @@ class Collection: except KeyError as e: raise MissingValueCollectionError(missing_value=e.args[0]) + def storage_type(self): + if urlparse(self.path).scheme == 's3': + return CollectionStorageType.S3 + else: + return CollectionStorageType.LOCAL + def directory(self): - if os.path.isdir(self.path): + if urlparse(self.path).scheme == 's3': + return self.path + elif os.path.isdir(self.path): return self.path else: return os.path.dirname(self.path) def owns_file(self, file_path: str) -> bool: - if os.path.isdir(file_path): - raise IsADirectoryError() - - if os.path.isdir(self.path): - return os.path.dirname(file_path) == self.path + if urlparse(file_path).scheme == 's3': + return file_path.find(self.path) == 0 else: - return fnmatch(file_path, self.path) + if os.path.isdir(file_path): + raise IsADirectoryError() - def files_owned(self) -> List[str]: - return glob(self.path, recursive=True) + if os.path.isdir(self.path): + return os.path.dirname(file_path) == self.path + else: + return fnmatch(file_path, self.path) diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py index 165341b..b9c7a25 100644 --- a/collection_manager/collection_manager/entities/__init__.py +++ b/collection_manager/collection_manager/entities/__init__.py @@ -1 +1,2 @@ from .Collection import Collection +from .Collection import CollectionStorageType \ No newline at end of file diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index cbe22f9..b80ae7c 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -3,8 +3,11 @@ import asyncio import logging import os -from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher -from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder +from collection_manager.services import (CollectionProcessor, + CollectionWatcher, MessagePublisher) +from collection_manager.services.history_manager import ( + FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder, + md5sum_from_filepath) logging.basicConfig(level=logging.INFO) logging.getLogger("pika").setLevel(logging.WARNING) @@ -51,6 +54,9 @@ def get_args() -> argparse.Namespace: default='30', metavar="INTERVAL", help='Number of seconds after which to reload the collections config file. (Default: 30)') + parser.add_argument('--s3-bucket', + metavar='S3-BUCKET', + help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.') return parser.parse_args() @@ -59,10 +65,14 @@ async def main(): try: options = get_args() + signature_fun = None if options.s3_bucket else md5sum_from_filepath + if options.history_path: - history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) + history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path, + signature_fun=signature_fun) else: - history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) + history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url, + signature_fun=signature_fun) async with MessagePublisher(host=options.rabbitmq_host, username=options.rabbitmq_username, password=options.rabbitmq_password, @@ -70,9 +80,9 @@ async def main(): collection_processor = CollectionProcessor(message_publisher=publisher, history_manager_builder=history_manager_builder) collection_watcher = CollectionWatcher(collections_path=options.collections_path, - collection_updated_callback=collection_processor.process_collection, granule_updated_callback=collection_processor.process_granule, - collections_refresh_interval=int(options.refresh)) + collections_refresh_interval=int(options.refresh), + s3_bucket=options.s3_bucket) await collection_watcher.start_watching() while True: diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 975f50c..96c461e 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -1,12 +1,16 @@ import logging import os.path +from glob import glob from typing import Dict -import yaml +from datetime import datetime +import yaml from collection_manager.entities import Collection from collection_manager.services import MessagePublisher -from collection_manager.services.history_manager import IngestionHistory, GranuleStatus -from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder +from collection_manager.services.history_manager import (GranuleStatus, + IngestionHistory) +from collection_manager.services.history_manager.IngestionHistory import \ + IngestionHistoryBuilder logger = logging.getLogger(__name__) @@ -20,16 +24,7 @@ class CollectionProcessor: self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} - async def process_collection(self, collection: Collection): - """ - Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each. - :param collection: A Collection definition - :return: None - """ - for granule in collection.files_owned(): - await self.process_granule(granule, collection) - - async def process_granule(self, granule: str, collection: Collection): + async def process_granule(self, granule: str, modified_time: int, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. :param granule: A path to a granule file @@ -40,7 +35,10 @@ class CollectionProcessor: return history_manager = self._get_history_manager(collection.dataset_id) - granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to) + granule_status = await history_manager.get_granule_status(granule, + modified_time, + collection.date_from, + collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " @@ -60,7 +58,7 @@ class CollectionProcessor: dataset_config = self._generate_ingestion_message(granule, collection) await self._publisher.publish_message(body=dataset_config, priority=use_priority) - await history_manager.push(granule) + await history_manager.push(granule, modified_time) @staticmethod def _file_supported(file_path: str): diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1c7c1be..b1aaf4e 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,17 +1,22 @@ import asyncio +from datetime import datetime +from collection_manager.entities.Collection import CollectionStorageType, Collection +from collection_manager.services.S3Observer import S3Event, S3Observer import logging import os import time from collections import defaultdict -from typing import Awaitable, Callable, Dict, Optional, Set +from glob import glob +from typing import Awaitable, Callable, Dict, List, Optional, Set import yaml -from collection_manager.entities import Collection -from collection_manager.entities.exceptions import ( - CollectionConfigFileNotFoundError, CollectionConfigParsingError, - ConflictingPathCollectionError, MissingValueCollectionError, - RelativePathCollectionError, RelativePathError) -from watchdog.events import FileSystemEventHandler +from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError, + CollectionConfigParsingError, + ConflictingPathCollectionError, + MissingValueCollectionError, + RelativePathCollectionError, + RelativePathError) +from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler from watchdog.observers.polling import PollingObserver as Observer logger = logging.getLogger(__name__) @@ -21,19 +26,18 @@ logger.setLevel(logging.DEBUG) class CollectionWatcher: def __init__(self, collections_path: str, - collection_updated_callback: Callable[[Collection], Awaitable], granule_updated_callback: Callable[[str, Collection], Awaitable], + s3_bucket: Optional[str] = None, collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path - self._collection_updated_callback = collection_updated_callback self._granule_updated_callback = granule_updated_callback self._collections_refresh_interval = collections_refresh_interval self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) - self._observer = Observer() + self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket else Observer() self._granule_watches = set() @@ -48,7 +52,11 @@ class CollectionWatcher: await self._run_periodically(loop=loop, wait_time=self._collections_refresh_interval, func=self._reload_and_reschedule) - self._observer.start() + + if isinstance(self._observer, S3Observer): + await self._observer.start() + else: + self._observer.start() def _collections(self) -> Set[Collection]: """ @@ -58,11 +66,15 @@ class CollectionWatcher: return {collection for collections in self._collections_by_dir.values() for collection in collections} def _validate_collection(self, collection: Collection): - directory = collection.directory() - if not os.path.isabs(directory): - raise RelativePathCollectionError(collection=collection) - if directory == os.path.dirname(self._collections_path): - raise ConflictingPathCollectionError(collection=collection) + if collection.storage_type() == CollectionStorageType.S3: + # do some S3 path validation here + return + else: + directory = collection.directory() + if not os.path.isabs(directory): + raise RelativePathCollectionError(collection=collection) + if directory == os.path.dirname(self._collections_path): + raise ConflictingPathCollectionError(collection=collection) def _load_collections(self): try: @@ -100,15 +112,23 @@ class CollectionWatcher: self._load_collections() return self._collections() - old_collections + async def _call_callback_for_all_granules(self, collections: List[Collection]): + logger.info(f"Scanning files for {len(collections)} collections...") + start = time.perf_counter() + for collection in collections: + for granule_path in glob(collection.path, recursive=True): + modified_time = int(os.path.getmtime(granule_path)) + await self._granule_updated_callback(granule_path, modified_time, collection) + logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") + async def _reload_and_reschedule(self): try: updated_collections = self._get_updated_collections() if len(updated_collections) > 0: - logger.info(f"Scanning files for {len(updated_collections)} collections...") - start = time.perf_counter() - for collection in updated_collections: - await self._collection_updated_callback(collection) - logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") + # For S3 collections, the S3Observer will report as new any files that haven't already been scanned. + # So we only need to rescan granules here if not using S3. + if not isinstance(self._observer, S3Observer): + await self._call_callback_for_all_granules(collections=updated_collections) self._unschedule_watches() self._schedule_watches() @@ -128,7 +148,10 @@ class CollectionWatcher: # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: - self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) + if isinstance(self._observer, S3Observer): + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) + else: + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) except (FileNotFoundError, NotADirectoryError): bad_collection_names = ' and '.join([col.dataset_id for col in collections]) logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") @@ -168,18 +191,21 @@ class _GranuleEventHandler(FileSystemEventHandler): def on_created(self, event): super().on_created(event) - for collection in self._collections_for_dir: - try: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) - except IsADirectoryError: - pass + self._handle_event(event) def on_modified(self, event): super().on_modified(event) - if os.path.isdir(event.src_path): - return + self._handle_event(event) + def _handle_event(self, event): + path = event.src_path for collection in self._collections_for_dir: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) + try: + if collection.owns_file(path): + if isinstance(event, S3Event): + modified_time = int(event.modified_time.timestamp()) + else: + modified_time = int(os.path.getmtime(path)) + self._loop.create_task(self._callback(path, modified_time, collection)) + except IsADirectoryError: + return diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py new file mode 100644 index 0000000..87458a9 --- /dev/null +++ b/collection_manager/collection_manager/services/S3Observer.py @@ -0,0 +1,145 @@ +import asyncio +from urllib.parse import urlparse +import datetime +import os +import time +from dataclasses import dataclass +from typing import Set, Dict, Optional, Callable, Awaitable + +import aioboto3 + + +@dataclass +class S3Event: + src_path: str + modified_time: datetime.datetime + + +class S3FileModifiedEvent(S3Event): + pass + + +class S3FileCreatedEvent(S3Event): + pass + + +class S3Watch(object): + def __init__(self, path: str, event_handler) -> None: + self.path = path + self.event_handler = event_handler + + +class S3Observer: + + def __init__(self, bucket, initial_scan=False) -> None: + self._bucket = bucket + self._cache: Dict[str, datetime.datetime] = {} + self._initial_scan = initial_scan + self._watches: Set[S3Watch] = set() + + self._has_polled = False + + async def start(self): + await self._run_periodically(loop=None, + wait_time=30, + func=self._poll) + + def unschedule(self, watch: S3Watch): + self._watches.remove(watch) + + def schedule(self, event_handler, path: str): + watch = S3Watch(path=path, event_handler=event_handler) + self._watches.add(watch) + return watch + + @classmethod + async def _run_periodically(cls, + loop: Optional[asyncio.AbstractEventLoop], + wait_time: float, + func: Callable[[any], Awaitable], + *args, + **kwargs): + """ + Call a function periodically. This uses asyncio, and is non-blocking. + :param loop: An optional event loop to use. If None, the current running event loop will be used. + :param wait_time: seconds to wait between iterations of func + :param func: the async function that will be awaited + :param args: any args that need to be provided to func + """ + if loop is None: + loop = asyncio.get_running_loop() + await func(*args, **kwargs) + loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs)) + + async def _poll(self): + new_cache = {} + watch_index = {} + + for watch in self._watches: + new_cache_for_watch = await self._get_s3_files(watch.path) + new_index = {file: watch for file in new_cache_for_watch} + + new_cache = {**new_cache, **new_cache_for_watch} + watch_index = {**watch_index, **new_index} + difference = set(new_cache.items()) - set(self._cache.items()) + + if self._has_polled or self._initial_scan: + for (file, modified_date) in difference: + watch = watch_index[file] + file_is_new = file not in self._cache + + if file_is_new: + watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, modified_time=modified_date)) + else: + watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, modified_time=modified_date)) + + self._cache = new_cache + self._has_polled = True + + async def _get_s3_files(self, path: str): + new_cache = {} + + start = time.perf_counter() + async with aioboto3.resource("s3") as s3: + bucket = await s3.Bucket(self._bucket) + + object_key = S3Observer._get_object_key(path) + async for file in bucket.objects.filter(Prefix=object_key): + new_cache[f"s3://{file.bucket_name}/{file.key}"] = await file.last_modified + end = time.perf_counter() + duration = end - start + + print(f"Retrieved {len(new_cache)} objects in {duration}") + + return new_cache + + def _get_object_key(full_path: str): + key = urlparse(full_path).path.strip("/") + return key + + +async def test(): + observer = S3Observer(bucket="nexus-ingest", initial_scan=False) + handler = Handler() + observer.schedule(handler, 'avhrr/2012') + observer.schedule(handler, 'avhrr/2013') + + await observer.start() + + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return + + +class Handler: + def on_created(self, event: S3Event): + print(f"File created: {event.src_path}") + + def on_modified(self, event: S3Event): + print(f"File modified: {event.src_path}") + + +if __name__ == "__main__": + asyncio.run(test()) diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py index 635d3dc..553e1b7 100644 --- a/collection_manager/collection_manager/services/__init__.py +++ b/collection_manager/collection_manager/services/__init__.py @@ -16,3 +16,4 @@ from .CollectionProcessor import CollectionProcessor from .CollectionWatcher import CollectionWatcher from .MessagePublisher import MessagePublisher +from .S3Observer import S3Observer diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index ffa065f..cf92997 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -4,7 +4,6 @@ from pathlib import Path from collection_manager.services.history_manager.IngestionHistory import IngestionHistory from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder -from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory): """ self._dataset_id = dataset_id self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv') - self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun + self._signature_fun = signature_fun self._history_dict = {} self._load_history_dict() diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index ef73ccb..7f33c79 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -1,4 +1,5 @@ import hashlib +from urllib.parse import urlparse import logging import os from abc import ABC, abstractmethod @@ -6,6 +7,8 @@ from datetime import datetime from enum import Enum from typing import Optional +from botocore.compat import filter_ssl_warnings + logger = logging.getLogger(__name__) BLOCK_SIZE = 65536 @@ -35,49 +38,28 @@ class GranuleStatus(Enum): class IngestionHistory(ABC): _signature_fun = None - _latest_ingested_file_update = None + _latest_ingested_file_update: int = None - async def push(self, file_path: str): + async def push(self, file_path: str, modified_timestamp: int): """ Record a file as having been ingested. :param file_path: The full path to the file to record. :return: None """ - file_path = file_path.strip() - file_name = os.path.basename(file_path) - signature = self._signature_fun(file_path) + file_name = IngestionHistory._get_standardized_path(file_path) + signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp) await self._push_record(file_name, signature) if not self._latest_ingested_file_update: - self._latest_ingested_file_update = os.path.getmtime(file_path) + self._latest_ingested_file_update = modified_timestamp else: - self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path)) + self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_timestamp) await self._save_latest_timestamp() - def latest_ingested_mtime(self) -> Optional[datetime]: - """ - Return the modified time of the most recently modified file that was ingested. - :return: A datetime or None - """ - if self._latest_ingested_file_update: - return datetime.fromtimestamp(self._latest_ingested_file_update) - else: - return None - - async def already_ingested(self, file_path: str) -> bool: - """ - Return a boolean indicating whether the specified file has already been ingested, based on its signature. - :param file_path: The full path of a file to search for in the history. - :return: A boolean indicating whether this file has already been ingested or not - """ - file_path = file_path.strip() - file_name = os.path.basename(file_path) - signature = self._signature_fun(file_path) - return signature == await self._get_signature(file_name) - async def get_granule_status(self, file_path: str, + modified_timestamp: int, date_from: datetime = None, date_to: datetime = None) -> GranuleStatus: """ @@ -94,13 +76,43 @@ class IngestionHistory(ABC): should fall in order to be "desired". :return: A GranuleStatus enum. """ - if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()): + signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp) + + if self._in_time_range(modified_timestamp, start_date=self._latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path): + elif self._in_time_range(modified_timestamp, date_from, date_to) and not await self._already_ingested(file_path, signature): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED + def _get_standardized_path(file_path: str): + file_path = file_path.strip() + # TODO: Why do we need to record the basename of the path, instead of just the full path? + # The only reason this is here right now is for backwards compatibility to avoid triggering a full reingestion. + if urlparse(file_path).scheme == 's3': + return urlparse(file_path).path.strip("/") + else: + return os.path.basename(file_path) + + def _latest_ingested_mtime(self) -> Optional[datetime]: + """ + Return the modified time of the most recently modified file that was ingested. + :return: A datetime or None + """ + if self._latest_ingested_file_update: + return datetime.fromtimestamp(self._latest_ingested_file_update) + else: + return None + + async def _already_ingested(self, file_path: str, signature) -> bool: + """ + Return a boolean indicating whether the specified file has already been ingested, based on its signature. + :param file_path: The full path of a file to search for in the history. + :return: A boolean indicating whether this file has already been ingested or not + """ + file_name = IngestionHistory._get_standardized_path(file_path) + return signature == await self._get_signature(file_name) + @abstractmethod async def _save_latest_timestamp(self): pass @@ -114,15 +126,14 @@ class IngestionHistory(ABC): pass @staticmethod - def _in_time_range(file, date_from: datetime = None, date_to: datetime = None): + def _in_time_range(timestamp: int, start_date: datetime = None, end_date: datetime = None): """ :param file: file path as a string :param date_from: timestamp, can be None :param date_to: timestamp, can be None :return: True is the update time of the file is between ts_from and ts_to. False otherwise """ - file_modified_time = os.path.getmtime(file) - is_after_from = date_from.timestamp() < file_modified_time if date_from else True - is_before_to = date_to.timestamp() > file_modified_time if date_to else True + is_after_from = int(start_date.timestamp()) < timestamp if start_date else True + is_before_to = int(end_date.timestamp()) > timestamp if end_date else True return is_after_from and is_before_to diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 59f5cd7..c6d26a5 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -3,11 +3,8 @@ import logging import pysolr import requests - +from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder) from common.async_utils.AsyncUtils import run_in_executor -from collection_manager.services.history_manager.IngestionHistory import IngestionHistory -from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder -from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath logging.getLogger("pysolr").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}") self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}") self._dataset_id = dataset_id - self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun + self._signature_fun = signature_fun self._latest_ingested_file_update = self._get_latest_file_update() except requests.exceptions.RequestException: raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}") @@ -67,7 +64,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_datasets.add([{ 'id': self._dataset_id, 'dataset_s': self._dataset_id, - 'latest_update_l': int(self._latest_ingested_file_update)}]) + 'latest_update_l': self._latest_ingested_file_update}]) self._solr_datasets.commit() def _get_latest_file_update(self): diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile index 2a57784..83e94ad 100644 --- a/collection_manager/docker/Dockerfile +++ b/collection_manager/docker/Dockerfile @@ -12,7 +12,9 @@ COPY collection_manager/requirements.txt /collection_manager/requirements.txt COPY collection_manager/README.md /collection_manager/README.md COPY collection_manager/docker/entrypoint.sh /entrypoint.sh -RUN cd /common && python setup.py install +RUN cd /common && python setup.py install RUN cd /collection_manager && python setup.py install +RUN pip install boto3==1.16.10 + ENTRYPOINT ["/bin/bash", "/entrypoint.sh"] diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh index 988dd2c..cad304a 100644 --- a/collection_manager/docker/entrypoint.sh +++ b/collection_manager/docker/entrypoint.sh @@ -7,5 +7,6 @@ python /collection_manager/collection_manager/main.py \ $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \ - $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) - $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) + $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) \ + $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) \ + $([[ ! -z "$S3_BUCKET" ]] && echo --s3-bucket=$S3_BUCKET) diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index ee12c89..c4b6323 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -3,5 +3,7 @@ pystache==0.5.4 pysolr==3.9.0 watchdog==0.10.2 requests==2.23.0 -aio-pika==6.6.1 -tenacity==6.2.0 \ No newline at end of file +tenacity==6.2.0 +aioboto3==8.0.5 +aiohttp==3.7.2 +aio-pika==6.7.1 \ No newline at end of file diff --git a/collection_manager/setup.py b/collection_manager/setup.py index 0616d0f..e1178f8 100644 --- a/collection_manager/setup.py +++ b/collection_manager/setup.py @@ -29,7 +29,7 @@ setuptools.setup( "Operating System :: OS Independent", "Development Status :: 4 - Beta", ], - python_requires='>=3.6', + python_requires='>=3.8', include_package_data=True, install_requires=pip_requirements ) diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py index 07ab0e1..8bd939e 100644 --- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py +++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py @@ -36,7 +36,7 @@ class TestFileIngestionHistory(unittest.TestCase): # history_manager with this file current_file_path = pathlib.Path(__file__) await ingestion_history.push(str(current_file_path)) - self.assertTrue(await ingestion_history.already_ingested(str(current_file_path))) + self.assertTrue(await ingestion_history._already_ingested(str(current_file_path))) del ingestion_history @@ -47,7 +47,7 @@ class TestFileIngestionHistory(unittest.TestCase): # history_manager with this file current_file_path = pathlib.Path(__file__) await ingestion_history.push(str(current_file_path)) - self.assertTrue(await ingestion_history.already_ingested(str(current_file_path))) + self.assertTrue(await ingestion_history._already_ingested(str(current_file_path))) del ingestion_history @@ -57,7 +57,7 @@ class TestFileIngestionHistory(unittest.TestCase): ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) # history_manager with this file current_file_path = pathlib.Path(__file__) - self.assertFalse(await ingestion_history.already_ingested(str(current_file_path))) + self.assertFalse(await ingestion_history._already_ingested(str(current_file_path))) if __name__ == '__main__': diff --git a/collection_manager/tests/services/test_S3Observer.py b/collection_manager/tests/services/test_S3Observer.py new file mode 100644 index 0000000..3fa49e0 --- /dev/null +++ b/collection_manager/tests/services/test_S3Observer.py @@ -0,0 +1,8 @@ +from collection_manager.services import S3Observer +import unittest + + +class TestS3Observer(unittest.TestCase): + + def test_get_object_key(self): + self.assertEqual('test_dir/object.nc', S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc')) diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index da92b1e..810e278 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -7,5 +7,4 @@ xarray pyyaml==5.3.1 requests==2.23.0 aiohttp==3.6.2 -aio-pika==6.6.1 tenacity diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile index 57bacff..1e7aedd 100644 --- a/granule_ingester/docker/Dockerfile +++ b/granule_ingester/docker/Dockerfile @@ -18,6 +18,8 @@ RUN ./install_nexusproto.sh RUN cd /common && python setup.py install RUN cd /sdap && python setup.py install +RUN pip install boto3==1.16.10 + RUN apk del .build-deps ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] \ No newline at end of file diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index 9b06860..d82e6ce 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,6 +1,7 @@ cassandra-driver==3.23.0 aiomultiprocess==0.7.0 -aioboto3 +aioboto3==8.0.5 tblib==1.6.0 pysolr==3.9.0 -kazoo==2.8.0 \ No newline at end of file +kazoo==2.8.0 +aio-pika==6.7.1 \ No newline at end of file
