This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit ab6998b0e71a745caae080bca48230937bf39818 Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 4 19:14:22 2020 -0700 SDAP-273: Configure max threads in Granule Ingester (#13) --- granule_ingester/docker/entrypoint.sh | 19 ++++++++------- .../granule_ingester/consumer/Consumer.py | 15 ++++++++---- granule_ingester/granule_ingester/main.py | 24 +++++++++++-------- .../granule_ingester/pipeline/Pipeline.py | 28 ++++++++++++++-------- 4 files changed, 52 insertions(+), 34 deletions(-) diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index b703ee3..04ed15c 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -1,12 +1,13 @@ #!/bin/sh python /sdap/granule_ingester/main.py \ - $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \ - $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \ - $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \ - $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ - $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ - $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ - $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ - $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ - $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) + $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \ + $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \ + $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \ + $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ + $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \ + $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \ + $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \ + $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \ + $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \ + $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 75d347a..5df51fe 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -46,7 +46,7 @@ class Consumer(HealthCheck): connection = await self._get_connection() await connection.close() return True - except: + except Exception: logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) return False @@ -64,25 +64,30 @@ class Consumer(HealthCheck): @staticmethod async def _received_message(message: aio_pika.IncomingMessage, data_store_factory, - metadata_store_factory): + metadata_store_factory, + pipeline_max_concurrency: int): logger.info("Received a job from the queue. Starting pipeline.") try: config_str = message.body.decode("utf-8") logger.debug(config_str) pipeline = Pipeline.from_string(config_str=config_str, data_store_factory=data_store_factory, - metadata_store_factory=metadata_store_factory) + metadata_store_factory=metadata_store_factory, + max_concurrency=pipeline_max_concurrency) await pipeline.run() message.ack() except Exception as e: message.reject(requeue=True) logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e)) - async def start_consuming(self): + async def start_consuming(self, pipeline_max_concurrency=16): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) async with queue.iterator() as queue_iter: async for message in queue_iter: - await self._received_message(message, self._data_store_factory, self._metadata_store_factory) + await self._received_message(message, + self._data_store_factory, + self._metadata_store_factory, + pipeline_max_concurrency) diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 9010e33..b54cffd 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -47,43 +47,47 @@ async def run_health_checks(dependencies: List[HealthCheck]): async def main(): parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process ' 'and ingest a granule for each message that comes through.') - parser.add_argument('--rabbitmq_host', + parser.add_argument('--rabbitmq-host', default='localhost', metavar='HOST', help='RabbitMQ hostname to connect to. (Default: "localhost")') - parser.add_argument('--rabbitmq_username', + parser.add_argument('--rabbitmq-username', default='guest', metavar='USERNAME', help='RabbitMQ username. (Default: "guest")') - parser.add_argument('--rabbitmq_password', + parser.add_argument('--rabbitmq-password', default='guest', metavar='PASSWORD', help='RabbitMQ password. (Default: "guest")') - parser.add_argument('--rabbitmq_queue', + parser.add_argument('--rabbitmq-queue', default="nexus", metavar="QUEUE", help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') - parser.add_argument('--cassandra_contact_points', + parser.add_argument('--cassandra-contact-points', default=['localhost'], metavar="HOST", nargs='+', help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")') - parser.add_argument('--cassandra_port', + parser.add_argument('--cassandra-port', default=9042, metavar="PORT", help='Cassandra port. (Default: 9042)') - parser.add_argument('--cassandra_username', + parser.add_argument('--cassandra-username', metavar="USERNAME", default=None, help='Cassandra username. Optional.') - parser.add_argument('--cassandra_password', + parser.add_argument('--cassandra-password', metavar="PASSWORD", default=None, help='Cassandra password. Optional.') - parser.add_argument('--solr_host_and_port', + parser.add_argument('--solr-host-and-port', default='http://localhost:8983', metavar='HOST:PORT', help='Solr host and port. (Default: http://localhost:8983)') + parser.add_argument('--max-threads', + default=16, + metavar='MAX_THREADS', + help='Maximum number of threads to use when processing granules. (Default: 16)') parser.add_argument('-v', '--verbose', action='store_true', @@ -126,7 +130,7 @@ async def main(): consumer]): async with consumer: logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming() + await consumer.start_consuming(args.max_threads) else: logger.error("Quitting because not all dependencies passed the health checks.") diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 8f2dd6f..e1e53bf 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -15,19 +15,20 @@ import logging +import os import time from typing import List -import aiomultiprocess import xarray as xr import yaml -from nexusproto import DataTile_pb2 as nexusproto +import aiomultiprocess from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor from granule_ingester.slicers import TileSlicer from granule_ingester.writers import DataStore, MetadataStore +from nexusproto import DataTile_pb2 as nexusproto logger = logging.getLogger(__name__) @@ -81,36 +82,41 @@ class Pipeline: slicer: TileSlicer, data_store_factory, metadata_store_factory, - tile_processors: List[TileProcessor]): + tile_processors: List[TileProcessor], + max_concurrency: int): self._granule_loader = granule_loader self._tile_processors = tile_processors self._slicer = slicer self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + self._max_concurrency = max_concurrency @classmethod - def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): + def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): config = yaml.load(config_str, yaml.FullLoader) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, - processor_module_mappings) + processor_module_mappings, + max_concurrency) @classmethod - def from_file(cls, config_path: str, data_store_factory, metadata_store_factory): + def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): with open(config_path) as config_file: config = yaml.load(config_file, yaml.FullLoader) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, - processor_module_mappings) + processor_module_mappings, + max_concurrency) @classmethod def _build_pipeline(cls, config: dict, data_store_factory, metadata_store_factory, - module_mappings: dict): + module_mappings: dict, + max_concurrency: int): granule_loader = GranuleLoader(**config['granule']) slicer_config = config['slicer'] @@ -121,7 +127,7 @@ class Pipeline: module = cls._parse_module(processor_config, module_mappings) tile_processors.append(module) - return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors) + return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency) @classmethod def _parse_module(cls, module_config: dict, module_mappings: dict): @@ -142,7 +148,9 @@ class Pipeline: initargs=(self._tile_processors, dataset, self._data_store_factory, - self._metadata_store_factory)) as pool: + self._metadata_store_factory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
