This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 16a636e7df5a0968a5c643c611835bb29cf38830 Author: Eamon Ford <[email protected]> AuthorDate: Tue Oct 6 13:15:23 2020 -0700 Create S3Observer --- .../services/CollectionWatcher.py | 14 +-- .../collection_manager/services/S3Observer.py | 140 +++++++++++++++++++++ .../collection_manager/services/__init__.py | 1 + collection_manager/requirements.txt | 3 +- 4 files changed, 150 insertions(+), 8 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 20ec7c7..499a17d 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -12,7 +12,7 @@ from collection_manager.entities.exceptions import ( CollectionConfigFileNotFoundError, CollectionConfigParsingError, ConflictingPathCollectionError, MissingValueCollectionError, RelativePathCollectionError, RelativePathError) -from watchdog.events import FileSystemEventHandler +from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler from watchdog.observers.polling import PollingObserver as Observer logger = logging.getLogger(__name__) @@ -179,9 +179,9 @@ class _GranuleEventHandler(FileSystemEventHandler): def on_modified(self, event): super().on_modified(event) - if os.path.isdir(event.src_path): - return - - for collection in self._collections_for_dir: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) + # if os.path.isdir(event.src_path): + # return + if type(event) == FileModifiedEvent: + for collection in self._collections_for_dir: + if collection.owns_file(event.src_path): + self._loop.create_task(self._callback(event.src_path, collection)) diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py new file mode 100644 index 0000000..9a86d1e --- /dev/null +++ b/collection_manager/collection_manager/services/S3Observer.py @@ -0,0 +1,140 @@ +import asyncio +import datetime +import os +import time +from dataclasses import dataclass +from typing import Set, Dict, Optional, Callable, Awaitable + +import aioboto3 + +os.environ['AWS_PROFILE'] = "saml-pub" +os.environ['AWS_DEFAULT_REGION'] = "us-west-2" + + +@dataclass +class S3Event: + src_path: str + + +class S3FileModifiedEvent(S3Event): + pass + + +class S3FileCreatedEvent(S3Event): + pass + + +class S3Watch(object): + def __init__(self, path: str, event_handler) -> None: + self.path = path + self.event_handler = event_handler + + +class S3Observer: + + def __init__(self, bucket, initial_scan=False) -> None: + self._bucket = bucket + self._cache: Dict[str, datetime.datetime] = {} + self._initial_scan = initial_scan + self._watches: Set[S3Watch] = set() + + self._has_polled = False + + async def start(self): + await self._run_periodically(loop=None, + wait_time=30, + func=self._poll) + + def unschedule(self, watch: S3Watch): + self._watches.remove(watch) + + def schedule(self, path: str, event_handler): + watch = S3Watch(path=path, event_handler=event_handler) + self._watches.add(watch) + return watch + + @classmethod + async def _run_periodically(cls, + loop: Optional[asyncio.AbstractEventLoop], + wait_time: float, + func: Callable[[any], Awaitable], + *args, + **kwargs): + """ + Call a function periodically. This uses asyncio, and is non-blocking. + :param loop: An optional event loop to use. If None, the current running event loop will be used. + :param wait_time: seconds to wait between iterations of func + :param func: the async function that will be awaited + :param args: any args that need to be provided to func + """ + if loop is None: + loop = asyncio.get_running_loop() + await func(*args, **kwargs) + loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs)) + + async def _poll(self): + new_cache = {} + watch_index = {} + + for watch in self._watches: + new_cache_for_watch = await self._get_s3_files(watch.path) + new_index = {file: watch for file in new_cache_for_watch} + + new_cache = {**new_cache, **new_cache_for_watch} + watch_index = {**watch_index, **new_index} + difference = set(new_cache.items()) - set(self._cache.items()) + + if self._has_polled or self._initial_scan: + for (file, modified_date) in difference: + watch = watch_index[file] + file_is_new = file not in self._cache + + if file_is_new: + watch.event_handler.on_created(S3FileCreatedEvent(src_path=file)) + else: + watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file)) + + self._cache = new_cache + self._has_polled = True + + async def _get_s3_files(self, path: str): + new_cache = {} + + start = time.perf_counter() + async with aioboto3.resource("s3") as s3: + bucket = await s3.Bucket(self._bucket) + async for file in bucket.objects.filter(Prefix=path): + new_cache[file.key] = await file.last_modified + end = time.perf_counter() + duration = end - start + + print(f"Retrieved {len(new_cache)} objects in {duration}") + + return new_cache + + +async def test(): + observer = S3Observer(bucket="nexus-ingest", initial_scan=False) + handler = Handler() + observer.schedule('avhrr/2012', handler) + observer.schedule('avhrr/2013', handler) + + await observer.start() + + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return + + +class Handler: + def on_created(self, event: S3Event): + print(f"File created: {event.src_path}") + + def on_modified(self, event: S3Event): + print(f"File modified: {event.src_path}") + + +if __name__ == "__main__": + asyncio.run(test()) diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py index 635d3dc..553e1b7 100644 --- a/collection_manager/collection_manager/services/__init__.py +++ b/collection_manager/collection_manager/services/__init__.py @@ -16,3 +16,4 @@ from .CollectionProcessor import CollectionProcessor from .CollectionWatcher import CollectionWatcher from .MessagePublisher import MessagePublisher +from .S3Observer import S3Observer diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index ee12c89..34f1334 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -4,4 +4,5 @@ pysolr==3.9.0 watchdog==0.10.2 requests==2.23.0 aio-pika==6.6.1 -tenacity==6.2.0 \ No newline at end of file +tenacity==6.2.0 +aioboto3==8.0.5 \ No newline at end of file
