This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit c56cd915a342f57d49912f63555fd93a15093099 Author: Eamon Ford <[email protected]> AuthorDate: Wed Oct 21 16:41:59 2020 -0700 wip --- .../collection_manager/entities/Collection.py | 12 ++++++++ .../collection_manager/entities/__init__.py | 1 + collection_manager/collection_manager/main.py | 3 +- .../services/CollectionWatcher.py | 33 ++++++++++++---------- .../collection_manager/services/S3Observer.py | 2 ++ collection_manager/requirements.txt | 3 +- 6 files changed, 37 insertions(+), 17 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index aa700cd..7a45b66 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -5,10 +5,16 @@ from datetime import datetime from fnmatch import fnmatch from glob import glob from typing import List, Optional +from enum import Enum from collection_manager.entities.exceptions import MissingValueCollectionError +class CollectionStorageType(Enum): + LOCAL = 1 + S3 = 2 + + @dataclass(frozen=True) class Collection: dataset_id: str @@ -40,6 +46,12 @@ class Collection: except KeyError as e: raise MissingValueCollectionError(missing_value=e.args[0]) + def storage_type(self): + if urlparse(self.path).scheme == 's3': + return CollectionStorageType.S3 + else: + return CollectionStorageType.LOCAL + def directory(self): if urlparse(self.path).scheme == 's3': return self.path diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py index 165341b..b9c7a25 100644 --- a/collection_manager/collection_manager/entities/__init__.py +++ b/collection_manager/collection_manager/entities/__init__.py @@ -1 +1,2 @@ from .Collection import Collection +from .Collection import CollectionStorageType \ No newline at end of file diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 3de4fdd..044cb87 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -71,7 +71,8 @@ async def main(): history_manager_builder=history_manager_builder) collection_watcher = CollectionWatcher(collections_path=options.collections_path, granule_updated_callback=collection_processor.process_granule, - collections_refresh_interval=int(options.refresh)) + collections_refresh_interval=int(options.refresh), + s3=True) await collection_watcher.start_watching() while True: diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 215e80e..87b1ac3 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,4 +1,5 @@ import asyncio +from collection_manager.entities.Collection import CollectionStorageType from collection_manager.services.S3Observer import S3Observer import logging import os @@ -69,11 +70,15 @@ class CollectionWatcher: return {collection for collections in self._collections_by_dir.values() for collection in collections} def _validate_collection(self, collection: Collection): - directory = collection.directory() - if not os.path.isabs(directory): - raise RelativePathCollectionError(collection=collection) - if directory == os.path.dirname(self._collections_path): - raise ConflictingPathCollectionError(collection=collection) + if collection.storage_type() == CollectionStorageType.S3: + # do some S3 path validation here + return + else: + directory = collection.directory() + if not os.path.isabs(directory): + raise RelativePathCollectionError(collection=collection) + if directory == os.path.dirname(self._collections_path): + raise ConflictingPathCollectionError(collection=collection) def _load_collections(self): try: @@ -185,18 +190,16 @@ class _GranuleEventHandler(FileSystemEventHandler): def on_created(self, event): super().on_created(event) - for collection in self._collections_for_dir: - try: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) - except IsADirectoryError: - pass + self._handle_event(event) def on_modified(self, event): super().on_modified(event) - # if os.path.isdir(event.src_path): - # return - if type(event) == FileModifiedEvent: - for collection in self._collections_for_dir: + self._handle_event(event) + + def _handle_event(self, event): + for collection in self._collections_for_dir: + try: if collection.owns_file(event.src_path): self._loop.create_task(self._callback(event.src_path, collection)) + except IsADirectoryError: + return diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py index 7720432..376a907 100644 --- a/collection_manager/collection_manager/services/S3Observer.py +++ b/collection_manager/collection_manager/services/S3Observer.py @@ -103,6 +103,8 @@ class S3Observer: start = time.perf_counter() async with aioboto3.resource("s3") as s3: bucket = await s3.Bucket(self._bucket) + + # we need the key without the bucket name async for file in bucket.objects.filter(Prefix=path): new_cache[file.key] = await file.last_modified end = time.perf_counter() diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index 34f1334..3402a73 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -5,4 +5,5 @@ watchdog==0.10.2 requests==2.23.0 aio-pika==6.6.1 tenacity==6.2.0 -aioboto3==8.0.5 \ No newline at end of file +aioboto3==8.0.5 +aiohttp==3.6.2 \ No newline at end of file
