This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch async-history in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a5d9a8f1c55fd359e82d26caac825cdd2aa256ae Author: Eamon Ford <[email protected]> AuthorDate: Thu Aug 6 16:15:11 2020 -0700 async solr history --- .../services/CollectionProcessor.py | 8 +++---- .../services/CollectionWatcher.py | 1 - .../history_manager/FileIngestionHistory.py | 3 ++- .../services/history_manager/IngestionHistory.py | 26 +++++++++++----------- .../history_manager/SolrIngestionHistory.py | 11 +++++---- collection_manager/docker/Dockerfile | 12 +++++----- config_operator/config_operator/main.py | 1 + .../granule_ingester/writers/SolrStore.py | 1 + 8 files changed, 33 insertions(+), 30 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index d790f4b..fc91e01 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -45,7 +45,7 @@ class CollectionProcessor: return history_manager = self._get_history_manager(collection.dataset_id) - granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to) + granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " @@ -59,13 +59,13 @@ class CollectionProcessor: f"'{collection.dataset_id}'.") use_priority = collection.historical_priority else: - logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired " - f"time range for collection '{collection.dataset_id}'. Skipping.") + logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired " + f"time range for collection '{collection.dataset_id}'. Skipping.") return dataset_config = self._fill_template(granule, collection, config_template=self._config_template) await self._publisher.publish_message(body=dataset_config, priority=use_priority) - history_manager.push(granule) + await history_manager.push(granule) @staticmethod def _file_supported(file_path: str): diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 8911806..0d5eabd 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable import yaml from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer -from yaml.scanner import ScannerError from collection_manager.entities import Collection from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \ diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index 50f2170..140ae87 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory): Constructor :param history_path: :param dataset_id: - :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp) + :param signature_fun: function which creates the signature of the cache, + a file path string as argument and returns a string (md5sum, time stamp) """ self._dataset_id = dataset_id self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv') diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index d92cb24..ef73ccb 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -37,7 +37,7 @@ class IngestionHistory(ABC): _signature_fun = None _latest_ingested_file_update = None - def push(self, file_path: str): + async def push(self, file_path: str): """ Record a file as having been ingested. :param file_path: The full path to the file to record. @@ -46,14 +46,14 @@ class IngestionHistory(ABC): file_path = file_path.strip() file_name = os.path.basename(file_path) signature = self._signature_fun(file_path) - self._push_record(file_name, signature) + await self._push_record(file_name, signature) if not self._latest_ingested_file_update: self._latest_ingested_file_update = os.path.getmtime(file_path) else: self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path)) - self._save_latest_timestamp() + await self._save_latest_timestamp() def latest_ingested_mtime(self) -> Optional[datetime]: """ @@ -65,7 +65,7 @@ class IngestionHistory(ABC): else: return None - def already_ingested(self, file_path: str) -> bool: + async def already_ingested(self, file_path: str) -> bool: """ Return a boolean indicating whether the specified file has already been ingested, based on its signature. :param file_path: The full path of a file to search for in the history. @@ -74,12 +74,12 @@ class IngestionHistory(ABC): file_path = file_path.strip() file_name = os.path.basename(file_path) signature = self._signature_fun(file_path) - return signature == self._get_signature(file_name) + return signature == await self._get_signature(file_name) - def get_granule_status(self, - file_path: str, - date_from: datetime = None, - date_to: datetime = None) -> GranuleStatus: + async def get_granule_status(self, + file_path: str, + date_from: datetime = None, + date_to: datetime = None) -> GranuleStatus: """ Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime). @@ -96,21 +96,21 @@ class IngestionHistory(ABC): """ if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path): + elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED @abstractmethod - def _save_latest_timestamp(self): + async def _save_latest_timestamp(self): pass @abstractmethod - def _push_record(self, file_name, signature): + async def _push_record(self, file_name, signature): pass @abstractmethod - def _get_signature(self, file_name): + async def _get_signature(self, file_name): pass @staticmethod diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 79d6eef..59f5cd7 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -4,10 +4,12 @@ import logging import pysolr import requests +from common.async_utils.AsyncUtils import run_in_executor from collection_manager.services.history_manager.IngestionHistory import IngestionHistory from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath +logging.getLogger("pysolr").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory): def __del__(self): self._req_session.close() + @run_in_executor def _push_record(self, file_name, signature): hash_id = doc_key(self._dataset_id, file_name) self._solr_granules.delete(q=f"id:{hash_id}") @@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_granules.commit() return None + @run_in_executor def _save_latest_timestamp(self): if self._solr_datasets: self._solr_datasets.delete(q=f"id:{self._dataset_id}") @@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory): else: return None + @run_in_executor def _get_signature(self, file_name): hash_id = doc_key(self._dataset_id, file_name) results = self._solr_granules.search(q=f"id:{hash_id}") @@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory): self._add_field(schema_endpoint, "granule_s", "string") self._add_field(schema_endpoint, "granule_signature_s", "string") - else: - logger.info(f"collection {self._granule_collection_name} already exists") - if self._dataset_collection_name not in existing_collections: # Create collection payload = {'action': 'CREATE', @@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory): self._add_field(schema_endpoint, "dataset_s", "string") self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") - else: - logger.info(f"collection {self._dataset_collection_name} already exists") - except requests.exceptions.RequestException as e: logger.error(f"solr instance unreachable {self._solr_url}") raise e diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile index ce1b577..2a57784 100644 --- a/collection_manager/docker/Dockerfile +++ b/collection_manager/docker/Dockerfile @@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list RUN apt-get update && apt-get install -y kubectl -COPY /collection_manager /collection_manager/collection_manager -COPY /setup.py /collection_manager/setup.py -COPY /requirements.txt /collection_manager/requirements.txt -COPY /README.md /collection_manager/README.md -COPY /docker/entrypoint.sh /entrypoint.sh +COPY common /common +COPY collection_manager/collection_manager /collection_manager/collection_manager +COPY collection_manager/setup.py /collection_manager/setup.py +COPY collection_manager/requirements.txt /collection_manager/requirements.txt +COPY collection_manager/README.md /collection_manager/README.md +COPY collection_manager/docker/entrypoint.sh /entrypoint.sh +RUN cd /common && python setup.py install RUN cd /collection_manager && python setup.py install ENTRYPOINT ["/bin/bash", "/entrypoint.sh"] diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py index fbbbe6b..1df9cf6 100644 --- a/config_operator/config_operator/main.py +++ b/config_operator/config_operator/main.py @@ -4,6 +4,7 @@ import kopf from config_operator.config_source import RemoteGitConfig, LocalDirConfig from config_operator.k8s import K8sConfigMap + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index e098672..65f8b09 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path from typing import Dict +from common import AsyncUtils import pysolr from kazoo.handlers.threading import KazooTimeoutError
