jhtimmins commented on a change in pull request #7128:
URL: https://github.com/apache/airflow/pull/7128#discussion_r417022066
##########
File path: tests/utils/test_dag_processing.py
##########
@@ -131,10 +131,47 @@ def __enter__(self):
def __exit__(self, *exc_info):
# shutil.rmtree(self.settings_root)
# Reset config
+
conf.set('logging', 'logging_config_class', '')
sys.path.remove(self.settings_root)
+class FakeDagFileProcessorRunner(DagFileProcessorProcess):
+ # This fake processor will return the zombies it received in constructor
+ # as its processing result w/o actually parsing anything.
+ def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
+ super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
+ self._result = zombies, 0
+
+ def start(self):
+ pass
+
+ @property
+ def start_time(self):
+ return DEFAULT_DATE
+
+ @property
+ def pid(self):
+ return 1234
+
+ @property
+ def done(self):
+ return True
+
+ @property
+ def result(self):
+ return self._result
+
+
+def fake_dag_file_processor_factory(file_path, zombies, dag_ids, pickle_dags):
Review comment:
For clarity, it might be worth keeping the argument order consistent. So
the preceding line would be:
`def fake_dag_file_processor_factory(file_path, pickle_dags, dag_ids,
zombies):`
##########
File path: tests/utils/test_dag_processing.py
##########
@@ -131,10 +131,47 @@ def __enter__(self):
def __exit__(self, *exc_info):
# shutil.rmtree(self.settings_root)
# Reset config
+
conf.set('logging', 'logging_config_class', '')
sys.path.remove(self.settings_root)
+class FakeDagFileProcessorRunner(DagFileProcessorProcess):
+ # This fake processor will return the zombies it received in constructor
+ # as its processing result w/o actually parsing anything.
+ def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
+ super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
+ self._result = zombies, 0
Review comment:
Might be worth defining the zero to provide some context for what it
represents.
##########
File path: airflow/utils/dag_processing.py
##########
@@ -332,8 +340,20 @@ def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in
manager.
"""
- self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
- self._process = multiprocessing.Process(
+ if conf.has_option('core', 'mp_start_method'):
+ mp_start_method = conf.get('core', 'mp_start_method')
+ else:
+ mp_start_method = mp.get_start_method()
+
+ possible_value_list = mp.get_all_start_methods()
+ if mp_start_method not in possible_value_list:
+ raise AirflowConfigException(
+ "mp_start_method should not be " + mp_start_method +
+ ". Possible value is one of " + str(possible_value_list))
+ cxt = mp.get_context(mp_start_method)
+
+ self._parent_signal_conn, child_signal_conn = cxt.Pipe()
+ self._process = cxt.Process(
Review comment:
It looks like this block of code is identical to
https://github.com/apache/airflow/pull/7128/files#diff-c35269bcfbbe386e269ffa7487e86192R171-R184.
Given that they would probably need to change at the same time if either ever
changed, this might be a good case for a utility mixin. Lines 343-352 could go
into a MultiProcessingConfigMixin class or similar.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]