This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 252500173ead6154a9b5110ee84604b080be742d Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 28 15:09:03 2020 -0700 use asyncio in the collection ingester --- collection_manager/collection_manager/main.py | 34 +++++++++--------- .../services/CollectionProcessor.py | 8 ++--- .../services/CollectionWatcher.py | 34 ++++++++++-------- .../services/MessagePublisher.py | 41 +++++++++++----------- collection_manager/requirements.txt | 2 ++ 5 files changed, 63 insertions(+), 56 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 43b687e..cbe22f9 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -63,25 +63,23 @@ async def main(): history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) else: history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) - publisher = MessagePublisher(host=options.rabbitmq_host, - username=options.rabbitmq_username, - password=options.rabbitmq_password, - queue=options.rabbitmq_queue) - publisher.connect() - collection_processor = CollectionProcessor(message_publisher=publisher, - history_manager_builder=history_manager_builder) - collection_watcher = CollectionWatcher(collections_path=options.collections_path, - collection_updated_callback=collection_processor.process_collection, - granule_updated_callback=collection_processor.process_granule, - collections_refresh_interval=int(options.refresh)) + async with MessagePublisher(host=options.rabbitmq_host, + username=options.rabbitmq_username, + password=options.rabbitmq_password, + queue=options.rabbitmq_queue) as publisher: + collection_processor = CollectionProcessor(message_publisher=publisher, + history_manager_builder=history_manager_builder) + collection_watcher = CollectionWatcher(collections_path=options.collections_path, + collection_updated_callback=collection_processor.process_collection, + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh)) - collection_watcher.start_watching() - - while True: - try: - await asyncio.sleep(1) - except KeyboardInterrupt: - return + await collection_watcher.start_watching() + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return except Exception as e: logger.exception(e) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 232cdee..d790f4b 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -25,16 +25,16 @@ class CollectionProcessor: with open(MESSAGE_TEMPLATE, 'r') as config_template_file: self._config_template = config_template_file.read() - def process_collection(self, collection: Collection): + async def process_collection(self, collection: Collection): """ Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each. :param collection: A Collection definition :return: None """ for granule in collection.files_owned(): - self.process_granule(granule, collection) + await self.process_granule(granule, collection) - def process_granule(self, granule: str, collection: Collection): + async def process_granule(self, granule: str, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. :param granule: A path to a granule file @@ -64,7 +64,7 @@ class CollectionProcessor: return dataset_config = self._fill_template(granule, collection, config_template=self._config_template) - self._publisher.publish_message(body=dataset_config, priority=use_priority) + await self._publisher.publish_message(body=dataset_config, priority=use_priority) history_manager.push(granule) @staticmethod diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 2387016..0d7da84 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -2,7 +2,6 @@ import asyncio import logging import os from collections import defaultdict -from functools import partial from typing import Dict, Callable, Set, Optional import yaml @@ -38,7 +37,7 @@ class CollectionWatcher: self._granule_watches = set() - def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None): + async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None): """ Periodically load the Collections Configuration file to check for changes, and observe filesystem events for added/modified granules. When an event occurs, @@ -46,7 +45,7 @@ class CollectionWatcher: :return: None """ - self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule) + await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule) self._observer.start() def collections(self) -> Set[Collection]: @@ -99,11 +98,11 @@ class CollectionWatcher: self._load_collections() return self.collections() - old_collections - def _reload_and_reschedule(self): + async def _reload_and_reschedule(self): try: updated_collections = self._get_updated_collections() for collection in updated_collections: - self._collection_updated_callback(collection) + await self._collection_updated_callback(collection) if len(updated_collections) > 0: self._unschedule_watches() self._schedule_watches() @@ -117,7 +116,9 @@ class CollectionWatcher: def _schedule_watches(self): for directory, collections in self._collections_by_dir.items(): - granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections) + granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(), + self._granule_updated_callback, + collections) # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: @@ -127,18 +128,22 @@ class CollectionWatcher: logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") @classmethod - def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args): + async def _run_periodically(cls, + loop: Optional[asyncio.AbstractEventLoop], + wait_time: float, + coro, + *args): """ Call a function periodically. This uses asyncio, and is non-blocking. :param loop: An optional event loop to use. If None, the current running event loop will be used. :param wait_time: seconds to wait between iterations of func - :param func: the function that will be run + :param coro: the coroutine that will be awaited :param args: any args that need to be provided to func """ if loop is None: loop = asyncio.get_running_loop() - func(*args) - loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func)) + await coro(*args) + loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro)) class _GranuleEventHandler(FileSystemEventHandler): @@ -146,15 +151,16 @@ class _GranuleEventHandler(FileSystemEventHandler): EventHandler that watches for new or modified granule files. """ - def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]): - self._callback = callback + def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]): + self._loop = loop + self._callback_coro = callback_coro self._collections_for_dir = collections_for_dir def on_created(self, event): super().on_created(event) for collection in self._collections_for_dir: if collection.owns_file(event.src_path): - self._callback(event.src_path, collection) + self._loop.create_task(self._callback_coro(event.src_path, collection)) def on_modified(self, event): super().on_modified(event) @@ -163,4 +169,4 @@ class _GranuleEventHandler(FileSystemEventHandler): for collection in self._collections_for_dir: if collection.owns_file(event.src_path): - self._callback(event.src_path, collection) + self._loop.create_task(self._callback_coro(event.src_path, collection)) diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py index f7a5517..75803d1 100644 --- a/collection_manager/collection_manager/services/MessagePublisher.py +++ b/collection_manager/collection_manager/services/MessagePublisher.py @@ -1,4 +1,5 @@ -import pika +from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust +from tenacity import retry, stop_after_attempt, wait_fixed class MessagePublisher: @@ -6,34 +7,34 @@ class MessagePublisher: def __init__(self, host: str, username: str, password: str, queue: str): self._connection_string = f"amqp://{username}:{password}@{host}/" self._queue = queue - self._channel = None - self._connection = None + self._channel: Channel = None + self._connection: Connection = None - def connect(self): + async def __aenter__(self): + await self._connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self._connection: + await self._connection.close() + + async def _connect(self): """ Establish a connection to RabbitMQ. :return: None """ - parameters = pika.URLParameters(self._connection_string) - self._connection = pika.BlockingConnection(parameters) - self._channel = self._connection.channel() - self._channel.queue_declare(self._queue, durable=True) + self._connection = await connect_robust(self._connection_string) + self._channel = await self._connection.channel() + await self._channel.declare_queue(self._queue, durable=True) - def publish_message(self, body: str, priority: int = None): + @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4)) + async def publish_message(self, body: str, priority: int = None): """ Publish a message to RabbitMQ using the optional message priority. :param body: A string to publish to RabbitMQ :param priority: An optional integer priority to use for the message :return: None """ - properties = pika.BasicProperties(content_type='text/plain', - delivery_mode=2, - priority=priority) - self._channel.basic_publish(exchange='', - routing_key=self._queue, - body=body, - properties=properties) - - def __del__(self): - if self._connection: - self._connection.close() + message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT) + await self._channel.default_exchange.publish(message, routing_key=self._queue) + diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index f16bde3..47ae867 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -4,3 +4,5 @@ pysolr==3.8.1 pika==1.1.0 watchdog==0.10.2 requests==2.23.0 +aio-pika==6.6.1 +tenacity==6.2.0 \ No newline at end of file
