This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch async-history in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 3968bd7d19e28bbee5cb33e4c35582b42e11db3d Author: Eamon Ford <[email protected]> AuthorDate: Mon Aug 10 17:29:32 2020 -0700 wip --- .../services/CollectionWatcher.py | 2 -- .../history_manager/FileIngestionHistory.py | 6 +++--- .../history_manager/test_FileIngestionHistory.py | 24 ++++++++++++++-------- .../tests/services/test_CollectionProcessor.py | 2 +- .../granule_ingester/writers/SolrStore.py | 18 +++++----------- 5 files changed, 24 insertions(+), 28 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1fd1678..49acceb 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -111,8 +111,6 @@ class CollectionWatcher: self._unschedule_watches() self._schedule_watches() - else: - logger.info("No updated collections, so no files to scan") except CollectionConfigParsingError as e: logger.error(e) diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index 140ae87..ffa065f 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -66,7 +66,7 @@ class FileIngestionHistory(IngestionHistory): except FileNotFoundError: logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed") - def _save_latest_timestamp(self): + async def _save_latest_timestamp(self): if self._latest_ingested_file_update: with open(self._latest_ingested_file_update_file_path, 'w') as f_ts: f_ts.write(f'{str(self._latest_ingested_file_update)}\n') @@ -90,10 +90,10 @@ class FileIngestionHistory(IngestionHistory): except FileNotFoundError: logger.info(f"no history file {self._history_file_path} to purge") - def _push_record(self, file_name, signature): + async def _push_record(self, file_name, signature): self._history_dict[file_name] = signature self._history_file.write(f'{file_name},{signature}\n') - def _get_signature(self, file_name): + async def _get_signature(self, file_name): return self._history_dict.get(file_name, None) diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py index d2ad45c..ff1fb3c 100644 --- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py +++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py @@ -3,25 +3,30 @@ import pathlib import tempfile import unittest -from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath +from collection_manager.services.history_manager import (FileIngestionHistory, + md5sum_from_filepath) + +from common.async_test_utils.AsyncTestUtils import async_test DATASET_ID = "zobi_la_mouche" class TestFileIngestionHistory(unittest.TestCase): - def test_get_md5sum(self): + @async_test + async def test_get_md5sum(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) - ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = ingestion_history._get_signature("blue") + await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = await ingestion_history._get_signature("blue") self.assertEqual(result, "12weeukrhbwerqu7wier") - def test_get_missing_md5sum(self): + @async_test + async def test_get_missing_md5sum(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) - ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = ingestion_history._get_signature("green") + await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = await ingestion_history._get_signature("green") self.assertIsNone(result) def test_already_ingested(self): @@ -44,12 +49,13 @@ class TestFileIngestionHistory(unittest.TestCase): del ingestion_history - def test_already_ingested_is_false(self): + @async_test + async def test_already_ingested_is_false(self): with tempfile.TemporaryDirectory() as history_dir: ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) # history_manager with this file current_file_path = pathlib.Path(__file__) - self.assertFalse(ingestion_history.already_ingested(str(current_file_path))) + self.assertFalse(await ingestion_history.already_ingested(str(current_file_path))) if __name__ == '__main__': diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index aa143f3..fec4726 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -114,7 +114,7 @@ class TestCollectionProcessor(unittest.TestCase): @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock) async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher, - mock_history_builder, mock_history): + mock_history_builder, mock_history): mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING mock_history_builder.build.return_value = mock_history diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 65f8b09..67532b5 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -21,28 +21,20 @@ from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path from typing import Dict -from common import AsyncUtils import pysolr -from kazoo.handlers.threading import KazooTimeoutError from kazoo.exceptions import NoNodeError -from nexusproto.DataTile_pb2 import TileSummary, NexusTile +from kazoo.handlers.threading import KazooTimeoutError -from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError +from common.async_utils.AsyncUtils import run_in_executor +from granule_ingester.exceptions import (SolrFailedHealthCheckError, + SolrLostConnectionError) from granule_ingester.writers.MetadataStore import MetadataStore +from nexusproto.DataTile_pb2 import NexusTile, TileSummary logger = logging.getLogger(__name__) -def run_in_executor(f): - @functools.wraps(f) - def inner(*args, **kwargs): - loop = asyncio.get_running_loop() - return loop.run_in_executor(None, lambda: f(*args, **kwargs)) - - return inner - - class SolrStore(MetadataStore): def __init__(self, solr_url=None, zk_url=None): super().__init__()
