This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch granule-ingester-max-threads
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 51c9b28d92431b1f5405fd5468874518bd96bb6e
Author: Eamon Ford <[email protected]>
AuthorDate: Tue Aug 4 18:02:13 2020 -0700

    add max concurrency
---
 granule_ingester/granule_ingester/pipeline/Pipeline.py | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..42d9eac 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -15,19 +15,20 @@
 
 
 import logging
+import os
 import time
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
-from nexusproto import DataTile_pb2 as nexusproto
 
+import aiomultiprocess
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as 
processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
 from granule_ingester.slicers import TileSlicer
 from granule_ingester.writers import DataStore, MetadataStore
+from nexusproto import DataTile_pb2 as nexusproto
 
 logger = logging.getLogger(__name__)
 
@@ -88,6 +89,8 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, 
metadata_store_factory):
         config = yaml.load(config_str, yaml.FullLoader)
@@ -142,7 +145,9 @@ class Pipeline:
                                             initargs=(self._tile_processors,
                                                       dataset,
                                                       self._data_store_factory,
-                                                      
self._metadata_store_factory)) as pool:
+                                                      
self._metadata_store_factory),
+                                            
maxtasksperchild=self._max_concurrency,
+                                            
childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, 
granule_name)]
                 # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that

Reply via email to