This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 0aa6121320995a1d6c0c7101b86b7d1af15cdabd Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 11 11:45:26 2020 -0700 SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16) --- collection_manager/README.md | 14 ++++---- .../services/CollectionProcessor.py | 8 ++--- .../services/CollectionWatcher.py | 15 +++++---- .../history_manager/FileIngestionHistory.py | 9 ++--- .../services/history_manager/IngestionHistory.py | 26 +++++++-------- .../history_manager/SolrIngestionHistory.py | 11 +++---- collection_manager/docker/Dockerfile | 12 ++++--- collection_manager/requirements.txt | 2 +- .../history_manager/test_FileIngestionHistory.py | 38 +++++++++++++--------- .../tests/services/test_CollectionProcessor.py | 16 +++++---- .../tests/services/test_CollectionWatcher.py | 1 - common/common/async_utils/AsyncUtils.py | 11 +++++++ common/common/async_utils/__init__.py | 1 + granule_ingester/README.md | 21 ++++++------ granule_ingester/docker/Dockerfile | 14 ++++---- .../granule_ingester/writers/SolrStore.py | 17 +++------- 16 files changed, 118 insertions(+), 98 deletions(-) diff --git a/collection_manager/README.md b/collection_manager/README.md index 771f355..84df468 100644 --- a/collection_manager/README.md +++ b/collection_manager/README.md @@ -12,15 +12,15 @@ Manager service will publish a message to RabbitMQ to be picked up by the Granul Python 3.7 ## Building the service -From `incubator-sdap-ingester/collection_manager`, run: - - $ python setup.py install +From `incubator-sdap-ingester`, run: + $ cd common && python setup.py install + $ cd ../collection_manager python setup.py install ## Running the service -From `incubator-sdap-ingester/collection_manager`, run: +From `incubator-sdap-ingester`, run: - $ python collection_manager/main.py -h + $ python collection_manager/collection_manager/main.py -h ### The Collections Configuration File @@ -71,6 +71,6 @@ From `incubator-sdap-ingester/`, run: $ pip install pytest && pytest ## Building the Docker image -From `incubator-sdap-ingester/collection_manager`, run: +From `incubator-sdap-ingester`, run: - $ docker build . -f docker/Dockerfile -t nexusjpl/collection-manager + $ docker build . -f collection_manager/docker/Dockerfile -t nexusjpl/collection-manager diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index d790f4b..ac61586 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -45,7 +45,7 @@ class CollectionProcessor: return history_manager = self._get_history_manager(collection.dataset_id) - granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to) + granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " @@ -59,13 +59,13 @@ class CollectionProcessor: f"'{collection.dataset_id}'.") use_priority = collection.historical_priority else: - logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired " - f"time range for collection '{collection.dataset_id}'. Skipping.") + logger.debug(f"Granule '{granule}' detected but has already been ingested in " + f"collection '{collection.dataset_id}'. Skipping.") return dataset_config = self._fill_template(granule, collection, config_template=self._config_template) await self._publisher.publish_message(body=dataset_config, priority=use_priority) - history_manager.push(granule) + await history_manager.push(granule) @staticmethod def _file_supported(file_path: str): diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 8f67e16..54c8877 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,18 +1,19 @@ import asyncio -import time import logging import os +import time from collections import defaultdict -from typing import Dict, Callable, Set, Optional, Awaitable +from typing import Awaitable, Callable, Dict, Optional, Set + import yaml +from collection_manager.entities import Collection +from collection_manager.entities.exceptions import ( + CollectionConfigFileNotFoundError, CollectionConfigParsingError, + ConflictingPathCollectionError, MissingValueCollectionError, + RelativePathCollectionError, RelativePathError) from watchdog.events import FileSystemEventHandler from watchdog.observers.polling import PollingObserver as Observer -from collection_manager.entities import Collection -from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \ - CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \ - RelativePathCollectionError - logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index 50f2170..ffa065f 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory): Constructor :param history_path: :param dataset_id: - :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp) + :param signature_fun: function which creates the signature of the cache, + a file path string as argument and returns a string (md5sum, time stamp) """ self._dataset_id = dataset_id self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv') @@ -65,7 +66,7 @@ class FileIngestionHistory(IngestionHistory): except FileNotFoundError: logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed") - def _save_latest_timestamp(self): + async def _save_latest_timestamp(self): if self._latest_ingested_file_update: with open(self._latest_ingested_file_update_file_path, 'w') as f_ts: f_ts.write(f'{str(self._latest_ingested_file_update)}\n') @@ -89,10 +90,10 @@ class FileIngestionHistory(IngestionHistory): except FileNotFoundError: logger.info(f"no history file {self._history_file_path} to purge") - def _push_record(self, file_name, signature): + async def _push_record(self, file_name, signature): self._history_dict[file_name] = signature self._history_file.write(f'{file_name},{signature}\n') - def _get_signature(self, file_name): + async def _get_signature(self, file_name): return self._history_dict.get(file_name, None) diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index d92cb24..ef73ccb 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -37,7 +37,7 @@ class IngestionHistory(ABC): _signature_fun = None _latest_ingested_file_update = None - def push(self, file_path: str): + async def push(self, file_path: str): """ Record a file as having been ingested. :param file_path: The full path to the file to record. @@ -46,14 +46,14 @@ class IngestionHistory(ABC): file_path = file_path.strip() file_name = os.path.basename(file_path) signature = self._signature_fun(file_path) - self._push_record(file_name, signature) + await self._push_record(file_name, signature) if not self._latest_ingested_file_update: self._latest_ingested_file_update = os.path.getmtime(file_path) else: self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path)) - self._save_latest_timestamp() + await self._save_latest_timestamp() def latest_ingested_mtime(self) -> Optional[datetime]: """ @@ -65,7 +65,7 @@ class IngestionHistory(ABC): else: return None - def already_ingested(self, file_path: str) -> bool: + async def already_ingested(self, file_path: str) -> bool: """ Return a boolean indicating whether the specified file has already been ingested, based on its signature. :param file_path: The full path of a file to search for in the history. @@ -74,12 +74,12 @@ class IngestionHistory(ABC): file_path = file_path.strip() file_name = os.path.basename(file_path) signature = self._signature_fun(file_path) - return signature == self._get_signature(file_name) + return signature == await self._get_signature(file_name) - def get_granule_status(self, - file_path: str, - date_from: datetime = None, - date_to: datetime = None) -> GranuleStatus: + async def get_granule_status(self, + file_path: str, + date_from: datetime = None, + date_to: datetime = None) -> GranuleStatus: """ Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime). @@ -96,21 +96,21 @@ class IngestionHistory(ABC): """ if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path): + elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED @abstractmethod - def _save_latest_timestamp(self): + async def _save_latest_timestamp(self): pass @abstractmethod - def _push_record(self, file_name, signature): + async def _push_record(self, file_name, signature): pass @abstractmethod - def _get_signature(self, file_name): + async def _get_signature(self, file_name): pass @staticmethod diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 79d6eef..59f5cd7 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -4,10 +4,12 @@ import logging import pysolr import requests +from common.async_utils.AsyncUtils import run_in_executor from collection_manager.services.history_manager.IngestionHistory import IngestionHistory from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath +logging.getLogger("pysolr").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory): def __del__(self): self._req_session.close() + @run_in_executor def _push_record(self, file_name, signature): hash_id = doc_key(self._dataset_id, file_name) self._solr_granules.delete(q=f"id:{hash_id}") @@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_granules.commit() return None + @run_in_executor def _save_latest_timestamp(self): if self._solr_datasets: self._solr_datasets.delete(q=f"id:{self._dataset_id}") @@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory): else: return None + @run_in_executor def _get_signature(self, file_name): hash_id = doc_key(self._dataset_id, file_name) results = self._solr_granules.search(q=f"id:{hash_id}") @@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory): self._add_field(schema_endpoint, "granule_s", "string") self._add_field(schema_endpoint, "granule_signature_s", "string") - else: - logger.info(f"collection {self._granule_collection_name} already exists") - if self._dataset_collection_name not in existing_collections: # Create collection payload = {'action': 'CREATE', @@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory): self._add_field(schema_endpoint, "dataset_s", "string") self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") - else: - logger.info(f"collection {self._dataset_collection_name} already exists") - except requests.exceptions.RequestException as e: logger.error(f"solr instance unreachable {self._solr_url}") raise e diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile index ce1b577..2a57784 100644 --- a/collection_manager/docker/Dockerfile +++ b/collection_manager/docker/Dockerfile @@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list RUN apt-get update && apt-get install -y kubectl -COPY /collection_manager /collection_manager/collection_manager -COPY /setup.py /collection_manager/setup.py -COPY /requirements.txt /collection_manager/requirements.txt -COPY /README.md /collection_manager/README.md -COPY /docker/entrypoint.sh /entrypoint.sh +COPY common /common +COPY collection_manager/collection_manager /collection_manager/collection_manager +COPY collection_manager/setup.py /collection_manager/setup.py +COPY collection_manager/requirements.txt /collection_manager/requirements.txt +COPY collection_manager/README.md /collection_manager/README.md +COPY collection_manager/docker/entrypoint.sh /entrypoint.sh +RUN cd /common && python setup.py install RUN cd /collection_manager && python setup.py install ENTRYPOINT ["/bin/bash", "/entrypoint.sh"] diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index 7e47c51..ee12c89 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -1,6 +1,6 @@ PyYAML==5.3.1 pystache==0.5.4 -pysolr==3.8.1 +pysolr==3.9.0 watchdog==0.10.2 requests==2.23.0 aio-pika==6.6.1 diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py index d2ad45c..07ab0e1 100644 --- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py +++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py @@ -3,53 +3,61 @@ import pathlib import tempfile import unittest -from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath +from collection_manager.services.history_manager import (FileIngestionHistory, + md5sum_from_filepath) + +from common.async_test_utils.AsyncTestUtils import async_test DATASET_ID = "zobi_la_mouche" class TestFileIngestionHistory(unittest.TestCase): - def test_get_md5sum(self): + @async_test + async def test_get_md5sum(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) - ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = ingestion_history._get_signature("blue") + await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = await ingestion_history._get_signature("blue") self.assertEqual(result, "12weeukrhbwerqu7wier") - def test_get_missing_md5sum(self): + @async_test + async def test_get_missing_md5sum(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) - ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = ingestion_history._get_signature("green") + await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = await ingestion_history._get_signature("green") self.assertIsNone(result) - def test_already_ingested(self): + @async_test + async def test_already_ingested(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) # history_manager with this file current_file_path = pathlib.Path(__file__) - ingestion_history.push(str(current_file_path)) - self.assertTrue(ingestion_history.already_ingested(str(current_file_path))) + await ingestion_history.push(str(current_file_path)) + self.assertTrue(await ingestion_history.already_ingested(str(current_file_path))) del ingestion_history - def test_already_ingested_with_latest_modifcation_signature(self): + @async_test + async def test_already_ingested_with_latest_modifcation_signature(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, os.path.getmtime) # history_manager with this file current_file_path = pathlib.Path(__file__) - ingestion_history.push(str(current_file_path)) - self.assertTrue(ingestion_history.already_ingested(str(current_file_path))) + await ingestion_history.push(str(current_file_path)) + self.assertTrue(await ingestion_history.already_ingested(str(current_file_path))) del ingestion_history - def test_already_ingested_is_false(self): + @async_test + async def test_already_ingested_is_false(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) # history_manager with this file current_file_path = pathlib.Path(__file__) - self.assertFalse(ingestion_history.already_ingested(str(current_file_path))) + self.assertFalse(await ingestion_history.already_ingested(str(current_file_path))) if __name__ == '__main__': diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index aa143f3..a7059d6 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -65,7 +65,7 @@ class TestCollectionProcessor(unittest.TestCase): self.assertEqual(filled, expected) @async_test - @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history): @@ -87,10 +87,12 @@ class TestCollectionProcessor(unittest.TestCase): mock_history.push.assert_called() @async_test - @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) - @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) - async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, + async def test_process_granule_with_forward_processing_granule(self, + mock_publisher, + mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING mock_history_builder.build.return_value = mock_history @@ -110,11 +112,11 @@ class TestCollectionProcessor(unittest.TestCase): mock_history.push.assert_called() @async_test - @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher, - mock_history_builder, mock_history): + mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING mock_history_builder.build.return_value = mock_history @@ -132,7 +134,7 @@ class TestCollectionProcessor(unittest.TestCase): mock_history.push.assert_called() @async_test - @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock) @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history): diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py index 14e7c3c..c9a75c0 100644 --- a/collection_manager/tests/services/test_CollectionWatcher.py +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -210,4 +210,3 @@ collections: callback = AsyncMock() await CollectionWatcher._run_periodically(None, 0.1, callback) await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2) - diff --git a/common/common/async_utils/AsyncUtils.py b/common/common/async_utils/AsyncUtils.py new file mode 100644 index 0000000..5fefd45 --- /dev/null +++ b/common/common/async_utils/AsyncUtils.py @@ -0,0 +1,11 @@ +import asyncio +import functools + + +def run_in_executor(f): + @functools.wraps(f) + def inner(*args, **kwargs): + loop = asyncio.get_running_loop() + return loop.run_in_executor(None, lambda: f(*args, **kwargs)) + + return inner diff --git a/common/common/async_utils/__init__.py b/common/common/async_utils/__init__.py new file mode 100644 index 0000000..9a468e0 --- /dev/null +++ b/common/common/async_utils/__init__.py @@ -0,0 +1 @@ +from .AsyncUtils import run_in_executor diff --git a/granule_ingester/README.md b/granule_ingester/README.md index 112f52d..1339835 100644 --- a/granule_ingester/README.md +++ b/granule_ingester/README.md @@ -12,23 +12,24 @@ data to Cassandra and Solr. Python 3.7 ## Building the service -From `incubator-sdap-ingester/granule_ingester`, run: - - $ python setup.py install +From `incubator-sdap-ingester`, run: + $ cd common && python setup.py install + $ cd ../granule_ingester && python setup.py install ## Launching the service -From `incubator-sdap-ingester/granule_ingester`, run: +From `incubator-sdap-ingester`, run: - $ python granule_ingester/main.py -h + $ python granule_ingester/granule_ingester/main.py -h ## Running the tests -From `incubator-sdap-ingester/granule_ingester`, run: +From `incubator-sdap-ingester`, run: - $ pip install pytest - $ pytest + $ cd common && python setup.py install + $ cd ../granule_ingester && python setup.py install + $ pip install pytest && pytest ## Building the Docker image -From `incubator-sdap-ingester/granule_ingester`, run: +From `incubator-sdap-ingester`, run: - $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester + $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile index 4b25318..57bacff 100644 --- a/granule_ingester/docker/Dockerfile +++ b/granule_ingester/docker/Dockerfile @@ -6,14 +6,16 @@ ENV PATH="/opt/conda/bin:$PATH" RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8 -COPY /granule_ingester /sdap/granule_ingester -COPY /setup.py /sdap/setup.py -COPY /requirements.txt /sdap/requirements.txt -COPY /conda-requirements.txt /sdap/conda-requirements.txt -COPY /docker/install_nexusproto.sh /install_nexusproto.sh -COPY /docker/entrypoint.sh /entrypoint.sh +COPY common /common +COPY granule_ingester/granule_ingester /sdap/granule_ingester +COPY granule_ingester/setup.py /sdap/setup.py +COPY granule_ingester/requirements.txt /sdap/requirements.txt +COPY granule_ingester/conda-requirements.txt /sdap/conda-requirements.txt +COPY granule_ingester/docker/install_nexusproto.sh /install_nexusproto.sh +COPY granule_ingester/docker/entrypoint.sh /entrypoint.sh RUN ./install_nexusproto.sh +RUN cd /common && python setup.py install RUN cd /sdap && python setup.py install RUN apk del .build-deps diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index e098672..67532b5 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -23,25 +23,18 @@ from pathlib import Path from typing import Dict import pysolr -from kazoo.handlers.threading import KazooTimeoutError from kazoo.exceptions import NoNodeError -from nexusproto.DataTile_pb2 import TileSummary, NexusTile +from kazoo.handlers.threading import KazooTimeoutError -from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError +from common.async_utils.AsyncUtils import run_in_executor +from granule_ingester.exceptions import (SolrFailedHealthCheckError, + SolrLostConnectionError) from granule_ingester.writers.MetadataStore import MetadataStore +from nexusproto.DataTile_pb2 import NexusTile, TileSummary logger = logging.getLogger(__name__) -def run_in_executor(f): - @functools.wraps(f) - def inner(*args, **kwargs): - loop = asyncio.get_running_loop() - return loop.run_in_executor(None, lambda: f(*args, **kwargs)) - - return inner - - class SolrStore(MetadataStore): def __init__(self, solr_url=None, zk_url=None): super().__init__()
