This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 1ab7753db0aa84e4a7ce8c25a48c582f61e6f5ff Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 14 13:52:50 2020 -0700 propagate child worker exceptions up to main process --- .../granule_ingester/exceptions/Exceptions.py | 6 +++ .../granule_ingester/exceptions/__init__.py | 1 + .../granule_ingester/pipeline/Pipeline.py | 53 ++++++++++++++-------- .../granule_ingester/writers/CassandraStore.py | 17 ++++--- granule_ingester/requirements.txt | 1 + 5 files changed, 52 insertions(+), 26 deletions(-) diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index 6e7d89a..f43bc2f 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -14,6 +14,10 @@ class RabbitMQConnectionError(Exception): pass +class CassandraConnectionError(Exception): + pass + + class FailedHealthCheckError(Exception): pass @@ -28,3 +32,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError): class RabbitMQFailedHealthCheckError(FailedHealthCheckError): pass + + diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index 2ba1b4a..400c9bf 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,3 +1,4 @@ +from .Exceptions import CassandraConnectionError from .Exceptions import CassandraFailedHealthCheckError from .Exceptions import FailedHealthCheckError from .Exceptions import PipelineBuildingError diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index e52d99f..14dc032 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -13,20 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging +import pickle import time +from multiprocessing import Manager from typing import List import aiomultiprocess import xarray as xr import yaml from aiomultiprocess.types import ProxyException -from cassandra.cluster import NoHostAvailable from nexusproto import DataTile_pb2 as nexusproto +from tblib import pickling_support from yaml.scanner import ScannerError -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError +from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -41,13 +42,15 @@ _worker_data_store: DataStore = None _worker_metadata_store: MetadataStore = None _worker_processor_list: List[TileProcessor] = None _worker_dataset = None +_shared_memory = None -def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory): +def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory): global _worker_data_store global _worker_metadata_store global _worker_processor_list global _worker_dataset + global _shared_memory # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. @@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac _worker_metadata_store = metadata_store_factory() _worker_processor_list = processor_list _worker_dataset = dataset + _shared_memory = shared_memory async def _process_tile_in_worker(serialized_input_tile: str): - global _worker_data_store - global _worker_metadata_store - global _worker_processor_list - global _worker_dataset + try: + input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) + processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) - processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - - if processed_tile: - # try: - await _worker_data_store.save_data(processed_tile) - await _worker_metadata_store.save_metadata(processed_tile) - # except NoHostAvailable as e: - # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra") + if processed_tile: + await _worker_data_store.save_data(processed_tile) + await _worker_metadata_store.save_metadata(processed_tile) + except Exception as e: + pickling_support.install(e) + _shared_memory.error = pickle.dumps(e) + raise def _recurse(processor_list: List[TileProcessor], @@ -96,10 +97,15 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + # Create a SyncManager Namespace so that we can to communicate exceptions from the + # worker processes back to the main process. + self._shared_memory = Manager().Namespace() + @classmethod def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): try: config = yaml.load(config_str, yaml.FullLoader) + cls._validate_config(config) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, @@ -108,6 +114,12 @@ class Pipeline: except yaml.scanner.ScannerError: raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") + # TODO: this method should validate the config against an actual schema definition + @staticmethod + def _validate_config(config: dict): + if type(config) is not dict: + raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.") + @classmethod def _build_pipeline(cls, config: dict, @@ -150,17 +162,18 @@ class Pipeline: initargs=(self._tile_processors, dataset, self._data_store_factory, - self._metadata_store_factory)) as pool: + self._metadata_store_factory, + self._shared_memory)) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. - for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): + for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): try: await pool.map(_process_tile_in_worker, chunk) except ProxyException: pool.terminate() - raise PipelineRunningError("Running the pipeline failed and could not recover.") + raise pickle.loads(self._shared_memory.error) end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 530871d..fbb5a7d 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -21,9 +21,10 @@ import uuid from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model +from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy from nexusproto.DataTile_pb2 import NexusTile, TileData -from granule_ingester.exceptions import CassandraFailedHealthCheckError +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError from granule_ingester.writers.DataStore import DataStore logging.getLogger('cassandra').setLevel(logging.INFO) @@ -52,7 +53,11 @@ class CassandraStore(DataStore): raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: - cluster = Cluster(contact_points=self._contact_points, port=self._port) + cluster = Cluster(contact_points=self._contact_points, + port=self._port, + # load_balancing_policy= + reconnection_policy=ConstantReconnectionPolicy(delay=5.0), + default_retry_policy=RetryPolicy()) session = cluster.connect() session.set_keyspace('nexustiles') return session @@ -69,10 +74,10 @@ class CassandraStore(DataStore): tile_id = uuid.UUID(tile.summary.tile_id) serialized_tile_data = TileData.SerializeToString(tile.tile) prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") - await type(self)._execute_query_async(self._session, prepared_query, - [tile_id, bytearray(serialized_tile_data)]) - except NoHostAvailable as e: - logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}") + await self._execute_query_async(self._session, prepared_query, + [tile_id, bytearray(serialized_tile_data)]) + except Exception: + raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.") @staticmethod async def _execute_query_async(session: Session, query, parameters=None): diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index a6d64a2..0479f99 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,3 +1,4 @@ cassandra-driver==3.23.0 aiomultiprocess==0.7.0 aioboto3 +tblib==1.6.0 \ No newline at end of file
