eamonford commented on a change in pull request #15:
URL: 
https://github.com/apache/incubator-sdap-ingester/pull/15#discussion_r466073626



##########
File path: granule_ingester/granule_ingester/pipeline/Pipeline.py
##########
@@ -91,43 +101,61 @@ def __init__(self,
         self._metadata_store_factory = metadata_store_factory
         self._max_concurrency = max_concurrency
 
-    @classmethod
-    def from_string(cls, config_str: str, data_store_factory, 
metadata_store_factory, max_concurrency: int = 16):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings,
-                                   max_concurrency)
+        # Create a SyncManager so that we can to communicate exceptions from 
the
+        # worker processes back to the main process.
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, 
metadata_store_factory, max_concurrency: int = 16):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+    def from_string(cls, config_str: str, data_store_factory, 
metadata_store_factory, max_concurrency: int = 16):
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings,
                                        max_concurrency)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a 
syntax error in the YAML.")
+
+    # TODO: this method should validate the config against an actual schema 
definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline because the 
config is not valid YAML.")

Review comment:
       Thanks, I will try to come up with a better message here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to