This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 6afe67f9e30930e1af687ea6ad8e904bcd4d28cb Author: Eamon Ford <[email protected]> AuthorDate: Tue Oct 6 17:20:36 2020 -0700 wip --- .../collection_manager/entities/Collection.py | 16 +++++++----- .../services/CollectionWatcher.py | 29 ++++++++++++++++------ .../collection_manager/services/S3Observer.py | 6 ++--- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index 0feba0e..b49ecbb 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -1,4 +1,5 @@ import os +from urllib.parse import urlparse from dataclasses import dataclass from datetime import datetime from fnmatch import fnmatch @@ -46,10 +47,13 @@ class Collection: return os.path.dirname(self.path) def owns_file(self, file_path: str) -> bool: - if os.path.isdir(file_path): - raise IsADirectoryError() - - if os.path.isdir(self.path): - return os.path.dirname(file_path) == self.path + if urlparse(file_path).scheme == 's3': + return file_path.find(self.path) == 0 else: - return fnmatch(file_path, self.path) + if os.path.isdir(file_path): + raise IsADirectoryError() + + if os.path.isdir(self.path): + return os.path.dirname(file_path) == self.path + else: + return fnmatch(file_path, self.path) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 499a17d..215e80e 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,4 +1,5 @@ import asyncio +from collection_manager.services.S3Observer import S3Observer import logging import os import time @@ -8,10 +9,12 @@ from typing import Awaitable, Callable, Dict, Optional, Set import yaml from collection_manager.entities import Collection -from collection_manager.entities.exceptions import ( - CollectionConfigFileNotFoundError, CollectionConfigParsingError, - ConflictingPathCollectionError, MissingValueCollectionError, - RelativePathCollectionError, RelativePathError) +from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError, + CollectionConfigParsingError, + ConflictingPathCollectionError, + MissingValueCollectionError, + RelativePathCollectionError, + RelativePathError) from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler from watchdog.observers.polling import PollingObserver as Observer @@ -23,6 +26,7 @@ class CollectionWatcher: def __init__(self, collections_path: str, granule_updated_callback: Callable[[str, Collection], Awaitable], + s3: bool = False, collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") @@ -32,7 +36,11 @@ class CollectionWatcher: self._collections_refresh_interval = collections_refresh_interval self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) - self._observer = Observer() + + if s3: + self._observer = S3Observer('nexus-ingest') + else: + self._observer = Observer() self._granule_watches = set() @@ -47,7 +55,11 @@ class CollectionWatcher: await self._run_periodically(loop=loop, wait_time=self._collections_refresh_interval, func=self._reload_and_reschedule) - self._observer.start() + + if type(self._observer) == S3Observer: + await self._observer.start() + else: + self._observer.start() def _collections(self) -> Set[Collection]: """ @@ -130,7 +142,10 @@ class CollectionWatcher: # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: - self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) + if type(self._observer) == S3Observer: + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) + else: + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) except (FileNotFoundError, NotADirectoryError): bad_collection_names = ' and '.join([col.dataset_id for col in collections]) logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py index 9a86d1e..7720432 100644 --- a/collection_manager/collection_manager/services/S3Observer.py +++ b/collection_manager/collection_manager/services/S3Observer.py @@ -48,7 +48,7 @@ class S3Observer: def unschedule(self, watch: S3Watch): self._watches.remove(watch) - def schedule(self, path: str, event_handler): + def schedule(self,event_handler, path: str): watch = S3Watch(path=path, event_handler=event_handler) self._watches.add(watch) return watch @@ -116,8 +116,8 @@ class S3Observer: async def test(): observer = S3Observer(bucket="nexus-ingest", initial_scan=False) handler = Handler() - observer.schedule('avhrr/2012', handler) - observer.schedule('avhrr/2013', handler) + observer.schedule(handler, 'avhrr/2012') + observer.schedule(handler, 'avhrr/2013') await observer.start()
