This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 11789f4ed7587d481c93aaf8b90b5a6f54b0dfc0 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 14 13:52:50 2020 -0700 propagate child worker exceptions up to main process --- .../granule_ingester/exceptions/Exceptions.py | 6 +++ .../granule_ingester/exceptions/__init__.py | 1 + .../granule_ingester/pipeline/Pipeline.py | 53 ++++++++++++++-------- granule_ingester/requirements.txt | 1 + 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index c648b99..7741ca6 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -25,6 +25,10 @@ class SolrLostConnectionError(LostConnectionError): pass +class CassandraConnectionError(Exception): + pass + + class FailedHealthCheckError(Exception): pass @@ -39,3 +43,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError): class RabbitMQFailedHealthCheckError(FailedHealthCheckError): pass + + diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index ea0969f..838ccff 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,3 +1,4 @@ +from .Exceptions import CassandraConnectionError from .Exceptions import CassandraFailedHealthCheckError from .Exceptions import CassandraLostConnectionError from .Exceptions import FailedHealthCheckError diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index e52d99f..14dc032 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -13,20 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging +import pickle import time +from multiprocessing import Manager from typing import List import aiomultiprocess import xarray as xr import yaml from aiomultiprocess.types import ProxyException -from cassandra.cluster import NoHostAvailable from nexusproto import DataTile_pb2 as nexusproto +from tblib import pickling_support from yaml.scanner import ScannerError -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError +from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -41,13 +42,15 @@ _worker_data_store: DataStore = None _worker_metadata_store: MetadataStore = None _worker_processor_list: List[TileProcessor] = None _worker_dataset = None +_shared_memory = None -def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory): +def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory): global _worker_data_store global _worker_metadata_store global _worker_processor_list global _worker_dataset + global _shared_memory # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. @@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac _worker_metadata_store = metadata_store_factory() _worker_processor_list = processor_list _worker_dataset = dataset + _shared_memory = shared_memory async def _process_tile_in_worker(serialized_input_tile: str): - global _worker_data_store - global _worker_metadata_store - global _worker_processor_list - global _worker_dataset + try: + input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) + processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) - processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - - if processed_tile: - # try: - await _worker_data_store.save_data(processed_tile) - await _worker_metadata_store.save_metadata(processed_tile) - # except NoHostAvailable as e: - # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra") + if processed_tile: + await _worker_data_store.save_data(processed_tile) + await _worker_metadata_store.save_metadata(processed_tile) + except Exception as e: + pickling_support.install(e) + _shared_memory.error = pickle.dumps(e) + raise def _recurse(processor_list: List[TileProcessor], @@ -96,10 +97,15 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + # Create a SyncManager Namespace so that we can to communicate exceptions from the + # worker processes back to the main process. + self._shared_memory = Manager().Namespace() + @classmethod def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): try: config = yaml.load(config_str, yaml.FullLoader) + cls._validate_config(config) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, @@ -108,6 +114,12 @@ class Pipeline: except yaml.scanner.ScannerError: raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") + # TODO: this method should validate the config against an actual schema definition + @staticmethod + def _validate_config(config: dict): + if type(config) is not dict: + raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.") + @classmethod def _build_pipeline(cls, config: dict, @@ -150,17 +162,18 @@ class Pipeline: initargs=(self._tile_processors, dataset, self._data_store_factory, - self._metadata_store_factory)) as pool: + self._metadata_store_factory, + self._shared_memory)) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. - for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): + for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): try: await pool.map(_process_tile_in_worker, chunk) except ProxyException: pool.terminate() - raise PipelineRunningError("Running the pipeline failed and could not recover.") + raise pickle.loads(self._shared_memory.error) end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index a6d64a2..0479f99 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,3 +1,4 @@ cassandra-driver==3.23.0 aiomultiprocess==0.7.0 aioboto3 +tblib==1.6.0 \ No newline at end of file
