This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch granule-ingester-max-threads in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 51c9b28d92431b1f5405fd5468874518bd96bb6e Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 4 18:02:13 2020 -0700 add max concurrency --- granule_ingester/granule_ingester/pipeline/Pipeline.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 8f2dd6f..42d9eac 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -15,19 +15,20 @@ import logging +import os import time from typing import List -import aiomultiprocess import xarray as xr import yaml -from nexusproto import DataTile_pb2 as nexusproto +import aiomultiprocess from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor from granule_ingester.slicers import TileSlicer from granule_ingester.writers import DataStore, MetadataStore +from nexusproto import DataTile_pb2 as nexusproto logger = logging.getLogger(__name__) @@ -88,6 +89,8 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16)) + @classmethod def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): config = yaml.load(config_str, yaml.FullLoader) @@ -142,7 +145,9 @@ class Pipeline: initargs=(self._tile_processors, dataset, self._data_store_factory, - self._metadata_store_factory)) as pool: + self._metadata_store_factory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
