This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 02df569ded4172c489a1b98498d1b821289999e2
Author: Eamon Ford <[email protected]>
AuthorDate: Tue Jul 14 17:36:08 2020 -0700

    error handling
---
 granule_ingester/granule_ingester/main.py          |  5 ++--
 .../granule_ingester/pipeline/Pipeline.py          | 35 +++++++++++++---------
 2 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index bb9ad40..751da19 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for 
granule ingestion instructions, and process '
                                                  'and ingest a granule for 
each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -134,4 +134,5 @@ async def main():
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 14dc032..f1aa021 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -19,15 +19,15 @@ import time
 from multiprocessing import Manager
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
+from aiomultiprocess import Pool
 from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
 from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError
+from granule_ingester.exceptions import PipelineBuildingError, 
LostConnectionError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as 
processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool 
if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
@@ -97,9 +99,12 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
-        # Create a SyncManager Namespace so that we can to communicate 
exceptions from the
+        # Create a SyncManager so that we can to communicate exceptions from 
the
         # worker processes back to the main process.
-        self._shared_memory = Manager().Namespace()
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, 
metadata_store_factory):
@@ -158,26 +163,28 @@ class Pipeline:
     async def run(self):
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      
self._metadata_store_factory,
-                                                      self._shared_memory)) as 
pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory)) as pool:
                 serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, 
granule_name)]
                 # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to 
batch it.
-                for chunk in self._chunk_list(serialized_tiles, 
MAX_QUEUE_SIZE):
+                for chunk in self._chunk_list(serialized_tiles, 
MAX_CHUNK_SIZE):
                     try:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
-                        raise pickle.loads(self._shared_memory.error)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), 
chunk_size)]

Reply via email to