This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit d748429329084cb19966c80393acaa4e53162391 Author: Eamon Ford <[email protected]> AuthorDate: Mon Aug 3 12:51:56 2020 -0700 SDAP-269: Switch to using aio-pika in collection-manager to maintain an asynchronous connection to RabbitMQ (#7) Co-authored-by: Eamon Ford <[email protected]> --- .gitignore | 4 +- collection_manager/README.md | 7 ++- collection_manager/collection_manager/main.py | 36 ++++++------ .../services/CollectionProcessor.py | 8 +-- .../services/CollectionWatcher.py | 45 +++++++++------ .../services/MessagePublisher.py | 41 +++++++------- collection_manager/requirements.txt | 3 +- .../tests/services/test_CollectionProcessor.py | 37 +++++++----- .../tests/services/test_CollectionWatcher.py | 66 ++++++++-------------- common/common/__init__.py | 0 common/common/async_test_utils/AsyncTestUtils.py | 28 +++++++++ common/common/async_test_utils/__init__.py | 1 + common/setup.py | 21 +++++++ granule_ingester/.gitignore | 9 --- granule_ingester/tests/writers/test_SolrStore.py | 5 +- 15 files changed, 178 insertions(+), 133 deletions(-) diff --git a/.gitignore b/.gitignore index e7c73b8..46e4151 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ credentials.json token.pickle tmp -sdap_ingest_manager.egg-info venv __pycache__/ dist/ @@ -11,3 +10,6 @@ build/ *.DS_Store .eggs temp/ +*.pyc +*.vscode +*.code-workspace \ No newline at end of file diff --git a/collection_manager/README.md b/collection_manager/README.md index 9d00cbb..771f355 100644 --- a/collection_manager/README.md +++ b/collection_manager/README.md @@ -64,10 +64,11 @@ collections: ``` ## Running the tests -From `incubator-sdap-ingester/collection_manager`, run: +From `incubator-sdap-ingester/`, run: - $ pip install pytest - $ pytest + $ cd common && python setup.py install + $ cd ../collection_manager && python setup.py install + $ pip install pytest && pytest ## Building the Docker image From `incubator-sdap-ingester/collection_manager`, run: diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 7e72de5..cbe22f9 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -63,28 +63,26 @@ async def main(): history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) else: history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) - publisher = MessagePublisher(host=options.rabbitmq_host, - username=options.rabbitmq_username, - password=options.rabbitmq_password, - queue=options.rabbitmq_queue) - publisher.connect() - collection_processor = CollectionProcessor(message_publisher=publisher, - history_manager_builder=history_manager_builder) - collection_watcher = CollectionWatcher(collections_path=options.collections_path, - collection_updated_callback=collection_processor.process_collection, - granule_updated_callback=collection_processor.process_granule, - collections_refresh_interval=int(options.refresh)) + async with MessagePublisher(host=options.rabbitmq_host, + username=options.rabbitmq_username, + password=options.rabbitmq_password, + queue=options.rabbitmq_queue) as publisher: + collection_processor = CollectionProcessor(message_publisher=publisher, + history_manager_builder=history_manager_builder) + collection_watcher = CollectionWatcher(collections_path=options.collections_path, + collection_updated_callback=collection_processor.process_collection, + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh)) - collection_watcher.start_watching() - - while True: - try: - await asyncio.sleep(1) - except KeyboardInterrupt: - return + await collection_watcher.start_watching() + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return except Exception as e: - logger.error(e) + logger.exception(e) return diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 232cdee..d790f4b 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -25,16 +25,16 @@ class CollectionProcessor: with open(MESSAGE_TEMPLATE, 'r') as config_template_file: self._config_template = config_template_file.read() - def process_collection(self, collection: Collection): + async def process_collection(self, collection: Collection): """ Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each. :param collection: A Collection definition :return: None """ for granule in collection.files_owned(): - self.process_granule(granule, collection) + await self.process_granule(granule, collection) - def process_granule(self, granule: str, collection: Collection): + async def process_granule(self, granule: str, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. :param granule: A path to a granule file @@ -64,7 +64,7 @@ class CollectionProcessor: return dataset_config = self._fill_template(granule, collection, config_template=self._config_template) - self._publisher.publish_message(body=dataset_config, priority=use_priority) + await self._publisher.publish_message(body=dataset_config, priority=use_priority) history_manager.push(granule) @staticmethod diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 2387016..8911806 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -2,9 +2,7 @@ import asyncio import logging import os from collections import defaultdict -from functools import partial -from typing import Dict, Callable, Set, Optional - +from typing import Dict, Callable, Set, Optional, Awaitable import yaml from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -22,8 +20,8 @@ logger.setLevel(logging.DEBUG) class CollectionWatcher: def __init__(self, collections_path: str, - collection_updated_callback: Callable[[Collection], any], - granule_updated_callback: Callable[[str, Collection], any], + collection_updated_callback: Callable[[Collection], Awaitable], + granule_updated_callback: Callable[[str, Collection], Awaitable], collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") @@ -38,7 +36,7 @@ class CollectionWatcher: self._granule_watches = set() - def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None): + async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None): """ Periodically load the Collections Configuration file to check for changes, and observe filesystem events for added/modified granules. When an event occurs, @@ -46,7 +44,9 @@ class CollectionWatcher: :return: None """ - self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule) + await self._run_periodically(loop=loop, + wait_time=self._collections_refresh_interval, + func=self._reload_and_reschedule) self._observer.start() def collections(self) -> Set[Collection]: @@ -99,11 +99,11 @@ class CollectionWatcher: self._load_collections() return self.collections() - old_collections - def _reload_and_reschedule(self): + async def _reload_and_reschedule(self): try: updated_collections = self._get_updated_collections() for collection in updated_collections: - self._collection_updated_callback(collection) + await self._collection_updated_callback(collection) if len(updated_collections) > 0: self._unschedule_watches() self._schedule_watches() @@ -117,7 +117,9 @@ class CollectionWatcher: def _schedule_watches(self): for directory, collections in self._collections_by_dir.items(): - granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections) + granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(), + self._granule_updated_callback, + collections) # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: @@ -127,18 +129,23 @@ class CollectionWatcher: logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") @classmethod - def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args): + async def _run_periodically(cls, + loop: Optional[asyncio.AbstractEventLoop], + wait_time: float, + func: Callable[[any], Awaitable], + *args, + **kwargs): """ Call a function periodically. This uses asyncio, and is non-blocking. :param loop: An optional event loop to use. If None, the current running event loop will be used. :param wait_time: seconds to wait between iterations of func - :param func: the function that will be run + :param func: the async function that will be awaited :param args: any args that need to be provided to func """ if loop is None: loop = asyncio.get_running_loop() - func(*args) - loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func)) + await func(*args, **kwargs) + loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs)) class _GranuleEventHandler(FileSystemEventHandler): @@ -146,7 +153,11 @@ class _GranuleEventHandler(FileSystemEventHandler): EventHandler that watches for new or modified granule files. """ - def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]): + def __init__(self, + loop: asyncio.AbstractEventLoop, + callback: Callable[[str, Collection], Awaitable], + collections_for_dir: Set[Collection]): + self._loop = loop self._callback = callback self._collections_for_dir = collections_for_dir @@ -154,7 +165,7 @@ class _GranuleEventHandler(FileSystemEventHandler): super().on_created(event) for collection in self._collections_for_dir: if collection.owns_file(event.src_path): - self._callback(event.src_path, collection) + self._loop.create_task(self._callback(event.src_path, collection)) def on_modified(self, event): super().on_modified(event) @@ -163,4 +174,4 @@ class _GranuleEventHandler(FileSystemEventHandler): for collection in self._collections_for_dir: if collection.owns_file(event.src_path): - self._callback(event.src_path, collection) + self._loop.create_task(self._callback(event.src_path, collection)) diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py index 559a69d..75803d1 100644 --- a/collection_manager/collection_manager/services/MessagePublisher.py +++ b/collection_manager/collection_manager/services/MessagePublisher.py @@ -1,4 +1,5 @@ -import pika +from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust +from tenacity import retry, stop_after_attempt, wait_fixed class MessagePublisher: @@ -6,34 +7,34 @@ class MessagePublisher: def __init__(self, host: str, username: str, password: str, queue: str): self._connection_string = f"amqp://{username}:{password}@{host}/" self._queue = queue - self._channel = None - self._connection = None + self._channel: Channel = None + self._connection: Connection = None - def connect(self): + async def __aenter__(self): + await self._connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self._connection: + await self._connection.close() + + async def _connect(self): """ Establish a connection to RabbitMQ. :return: None """ - parameters = pika.URLParameters(self._connection_string) - self._connection = pika.BlockingConnection(parameters) - self._channel = self._connection.channel() - self._channel.queue_declare(self._queue, durable=True) + self._connection = await connect_robust(self._connection_string) + self._channel = await self._connection.channel() + await self._channel.declare_queue(self._queue, durable=True) - def publish_message(self, body: str, priority: int = None): + @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4)) + async def publish_message(self, body: str, priority: int = None): """ Publish a message to RabbitMQ using the optional message priority. :param body: A string to publish to RabbitMQ :param priority: An optional integer priority to use for the message :return: None """ - properties = pika.BasicProperties(content_type='text/plain', - delivery_mode=1, - priority=priority) - self._channel.basic_publish(exchange='', - routing_key=self._queue, - body=body, - properties=properties) - - def __del__(self): - if self._connection: - self._connection.close() + message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT) + await self._channel.default_exchange.publish(message, routing_key=self._queue) + diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index f16bde3..7e47c51 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -1,6 +1,7 @@ PyYAML==5.3.1 pystache==0.5.4 pysolr==3.8.1 -pika==1.1.0 watchdog==0.10.2 requests==2.23.0 +aio-pika==6.6.1 +tenacity==6.2.0 \ No newline at end of file diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index 56d5393..aa143f3 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -6,6 +6,7 @@ from collection_manager.entities import Collection from collection_manager.services import CollectionProcessor from collection_manager.services.history_manager import FileIngestionHistoryBuilder from collection_manager.services.history_manager import GranuleStatus +from common.async_test_utils import AsyncMock, async_test class TestCollectionProcessor(unittest.TestCase): @@ -63,10 +64,11 @@ class TestCollectionProcessor(unittest.TestCase): filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template) self.assertEqual(filled, expected) + @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) - @mock.patch('collection_manager.services.MessagePublisher', autospec=True) - def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history): + @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) + async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_HISTORICAL mock_history_builder.build.return_value = mock_history @@ -79,15 +81,17 @@ class TestCollectionProcessor(unittest.TestCase): date_from=None, date_to=None) - collection_processor.process_granule("test.nc", collection) + await collection_processor.process_granule("test.nc", collection) mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1) mock_history.push.assert_called() + @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) - @mock.patch('collection_manager.services.MessagePublisher', autospec=True) - def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, mock_history): + @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) + async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, + mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING mock_history_builder.build.return_value = mock_history @@ -100,15 +104,16 @@ class TestCollectionProcessor(unittest.TestCase): date_from=None, date_to=None) - collection_processor.process_granule("test.h5", collection) + await collection_processor.process_granule("test.h5", collection) mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=2) mock_history.push.assert_called() + @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) - @mock.patch('collection_manager.services.MessagePublisher', autospec=True) - def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher, + @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) + async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher, mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING mock_history_builder.build.return_value = mock_history @@ -121,15 +126,16 @@ class TestCollectionProcessor(unittest.TestCase): date_from=None, date_to=None) - collection_processor.process_granule("test.h5", collection) + await collection_processor.process_granule("test.h5", collection) mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1) mock_history.push.assert_called() + @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) - @mock.patch('collection_manager.services.MessagePublisher', autospec=True) - def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history): + @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) + async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED mock_history_builder.build.return_value = mock_history @@ -142,15 +148,16 @@ class TestCollectionProcessor(unittest.TestCase): date_from=None, date_to=None) - collection_processor.process_granule("test.nc", collection) + await collection_processor.process_granule("test.nc", collection) mock_publisher.publish_message.assert_not_called() mock_history.push.assert_not_called() + @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) - @mock.patch('collection_manager.services.MessagePublisher', autospec=True) - def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history): + @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) + async def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history): mock_history_builder.build.return_value = mock_history collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) @@ -162,7 +169,7 @@ class TestCollectionProcessor(unittest.TestCase): date_from=None, date_to=None) - collection_processor.process_granule("test.foo", collection) + await collection_processor.process_granule("test.foo", collection) mock_publisher.publish_message.assert_not_called() mock_history.push.assert_not_called() diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py index b954812..14e7c3c 100644 --- a/collection_manager/tests/services/test_CollectionWatcher.py +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -1,4 +1,3 @@ -import asyncio import os import tempfile import unittest @@ -9,6 +8,7 @@ from collection_manager.entities import Collection from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \ RelativePathCollectionError, ConflictingPathCollectionError from collection_manager.services import CollectionWatcher +from common.async_test_utils.AsyncTestUtils import AsyncAssert, AsyncMock, async_test class TestCollectionWatcher(unittest.TestCase): @@ -30,7 +30,7 @@ class TestCollectionWatcher(unittest.TestCase): def test_load_collections_loads_all_collections(self): collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') - collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + collection_watcher = CollectionWatcher(collections_path, AsyncMock(), AsyncMock()) collection_watcher._load_collections() self.assertEqual(len(collection_watcher._collections_by_dir), 2) @@ -120,7 +120,8 @@ class TestCollectionWatcher(unittest.TestCase): date_to=None) self.assertRaises(ConflictingPathCollectionError, collection_watcher._validate_collection, collection) - def test_collection_callback_is_called(self): + @async_test + async def test_collection_callback_is_called(self): collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0, delete=False) granule_dir = tempfile.TemporaryDirectory() collections_str = f"""collections: @@ -131,14 +132,13 @@ class TestCollectionWatcher(unittest.TestCase): forward-processing-priority: 5""" collections_config.write(collections_str.encode("utf-8")) - collection_callback = Mock() + collection_callback = AsyncMock() collection_watcher = CollectionWatcher(collections_path=collections_config.name, collection_updated_callback=collection_callback, - granule_updated_callback=Mock(), + granule_updated_callback=AsyncMock(), collections_refresh_interval=0.1) - loop = asyncio.new_event_loop() - collection_watcher.start_watching(loop) + await collection_watcher.start_watching() collections_str = f""" - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND @@ -149,14 +149,14 @@ class TestCollectionWatcher(unittest.TestCase): """ collections_config.write(collections_str.encode("utf-8")) - loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2)) + await AsyncAssert.assert_called_within_timeout(collection_callback, call_count=2) - loop.close() collections_config.close() granule_dir.cleanup() os.remove(collections_config.name) - def test_granule_callback_is_called_on_new_file(self): + @async_test + async def test_granule_callback_is_called_on_new_file(self): with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config: granule_dir = tempfile.TemporaryDirectory() collections_str = f""" @@ -169,21 +169,18 @@ collections: """ collections_config.write(collections_str.encode("utf-8")) - granule_callback = Mock() - collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) - - loop = asyncio.new_event_loop() - collection_watcher.start_watching(loop) + granule_callback = AsyncMock() + collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback) + await collection_watcher.start_watching() new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") + await AsyncAssert.assert_called_within_timeout(granule_callback) - loop.run_until_complete(self.assert_called_within_timeout(granule_callback)) - - loop.close() new_granule.close() granule_dir.cleanup() - def test_granule_callback_is_called_on_modified_file(self): + @async_test + async def test_granule_callback_is_called_on_modified_file(self): with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config: granule_dir = tempfile.TemporaryDirectory() collections_str = f""" @@ -197,33 +194,20 @@ collections: collections_config.write(collections_str.encode("utf-8")) new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") - granule_callback = Mock() - collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) + granule_callback = AsyncMock() + collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback) - loop = asyncio.new_event_loop() - collection_watcher.start_watching(loop) + await collection_watcher.start_watching() new_granule.write("hello world") new_granule.close() - loop.run_until_complete(self.assert_called_within_timeout(granule_callback)) - loop.close() + await AsyncAssert.assert_called_within_timeout(granule_callback) granule_dir.cleanup() - def test_run_periodically(self): - callback = Mock() - loop = asyncio.new_event_loop() - CollectionWatcher._run_periodically(loop, 0.1, callback) - loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)) - loop.close() - - @staticmethod - async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1): - start = datetime.now() - - while (datetime.now() - start).total_seconds() < timeout_sec: - await asyncio.sleep(0.01) - if mock_func.call_count >= call_count: - return - raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec") + @async_test + async def test_run_periodically(self): + callback = AsyncMock() + await CollectionWatcher._run_periodically(None, 0.1, callback) + await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2) diff --git a/common/common/__init__.py b/common/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/common/async_test_utils/AsyncTestUtils.py b/common/common/async_test_utils/AsyncTestUtils.py new file mode 100644 index 0000000..ccb829b --- /dev/null +++ b/common/common/async_test_utils/AsyncTestUtils.py @@ -0,0 +1,28 @@ +import asyncio +from datetime import datetime +from unittest import mock + + +class AsyncMock(mock.MagicMock): + async def __call__(self, *args, **kwargs): + return super().__call__(*args, **kwargs) + + +class AsyncAssert: + @staticmethod + async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1): + start = datetime.now() + + while (datetime.now() - start).total_seconds() < timeout_sec: + await asyncio.sleep(0.01) + if mock_func.call_count >= call_count: + return + raise AssertionError(f"Mock did not reach {call_count} calls called within {timeout_sec} sec") + + +def async_test(coro): + def wrapper(*args, **kwargs): + loop = asyncio.new_event_loop() + return loop.run_until_complete(coro(*args, **kwargs)) + + return wrapper diff --git a/common/common/async_test_utils/__init__.py b/common/common/async_test_utils/__init__.py new file mode 100644 index 0000000..12563af --- /dev/null +++ b/common/common/async_test_utils/__init__.py @@ -0,0 +1 @@ +from .AsyncTestUtils import AsyncMock, AsyncAssert, async_test diff --git a/common/setup.py b/common/setup.py new file mode 100644 index 0000000..ed621c8 --- /dev/null +++ b/common/setup.py @@ -0,0 +1,21 @@ +import re + +import setuptools + +PACKAGE_NAME = "sdap_ingester_common" + +setuptools.setup( + name=PACKAGE_NAME, + author="Apache - SDAP", + author_email="[email protected]", + description="a module of common functions for the sdap ingester components", + url="https://github.com/apache/incubator-sdap-ingester", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + "Development Status :: 4 - Beta", + ], + python_requires='>=3.7', + include_package_data=True +) diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore deleted file mode 100644 index 5408b74..0000000 --- a/granule_ingester/.gitignore +++ /dev/null @@ -1,9 +0,0 @@ -.vscode -.idea -*.egg-info -*__pycache__ -*.pytest_cache -*.code-workspace -.DS_STORE -build -dist \ No newline at end of file diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py index 76b85ac..0e971ce 100644 --- a/granule_ingester/tests/writers/test_SolrStore.py +++ b/granule_ingester/tests/writers/test_SolrStore.py @@ -1,4 +1,3 @@ -import asyncio import unittest from nexusproto import DataTile_pb2 as nexusproto @@ -43,8 +42,8 @@ class TestSolrStore(unittest.TestCase): self.assertEqual('test_variable', solr_doc['tile_var_name_s']) self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon']) self.assertAlmostEqual(90.0, solr_doc['tile_max_lon']) - self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat']) - self.assertAlmostEqual(180.2, solr_doc['tile_max_lat']) + self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'], delta=1E-5) + self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'], delta=1E-5) self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt']) self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt']) self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])
