This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 3cadbd69f79470f18881897d872d7ebdb4ac3a30 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 28 15:09:03 2020 -0700 use asyncio in the collection ingester --- .../services/CollectionWatcher.py | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 8911806..00afce4 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -44,9 +44,13 @@ class CollectionWatcher: :return: None """ +<<<<<<< HEAD await self._run_periodically(loop=loop, wait_time=self._collections_refresh_interval, func=self._reload_and_reschedule) +======= + await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule) +>>>>>>> use asyncio in the collection ingester self._observer.start() def collections(self) -> Set[Collection]: @@ -132,20 +136,34 @@ class CollectionWatcher: async def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, +<<<<<<< HEAD func: Callable[[any], Awaitable], *args, **kwargs): +======= + coro, + *args): +>>>>>>> use asyncio in the collection ingester """ Call a function periodically. This uses asyncio, and is non-blocking. :param loop: An optional event loop to use. If None, the current running event loop will be used. :param wait_time: seconds to wait between iterations of func +<<<<<<< HEAD :param func: the async function that will be awaited +======= + :param coro: the coroutine that will be awaited +>>>>>>> use asyncio in the collection ingester :param args: any args that need to be provided to func """ if loop is None: loop = asyncio.get_running_loop() +<<<<<<< HEAD await func(*args, **kwargs) loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs)) +======= + await coro(*args) + loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro)) +>>>>>>> use asyncio in the collection ingester class _GranuleEventHandler(FileSystemEventHandler): @@ -153,19 +171,29 @@ class _GranuleEventHandler(FileSystemEventHandler): EventHandler that watches for new or modified granule files. """ +<<<<<<< HEAD def __init__(self, loop: asyncio.AbstractEventLoop, callback: Callable[[str, Collection], Awaitable], collections_for_dir: Set[Collection]): self._loop = loop self._callback = callback +======= + def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]): + self._loop = loop + self._callback_coro = callback_coro +>>>>>>> use asyncio in the collection ingester self._collections_for_dir = collections_for_dir def on_created(self, event): super().on_created(event) for collection in self._collections_for_dir: if collection.owns_file(event.src_path): +<<<<<<< HEAD self._loop.create_task(self._callback(event.src_path, collection)) +======= + self._loop.create_task(self._callback_coro(event.src_path, collection)) +>>>>>>> use asyncio in the collection ingester def on_modified(self, event): super().on_modified(event) @@ -174,4 +202,8 @@ class _GranuleEventHandler(FileSystemEventHandler): for collection in self._collections_for_dir: if collection.owns_file(event.src_path): +<<<<<<< HEAD self._loop.create_task(self._callback(event.src_path, collection)) +======= + self._loop.create_task(self._callback_coro(event.src_path, collection)) +>>>>>>> use asyncio in the collection ingester
