XD-DENG commented on a change in pull request #3873: [Airflow-2760] Decouple
DAG parsing loop from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219654682
##########
File path: airflow/utils/dag_processing.py
##########
@@ -324,48 +628,322 @@ class DagFileProcessorManager(LoggingMixin):
def __init__(self,
dag_directory,
file_paths,
- parallelism,
- process_file_interval,
max_runs,
- processor_factory):
+ processor_factory,
+ signal_conn,
+ stat_queue,
+ result_queue,
+ async_mode=True):
"""
:param dag_directory: Directory where DAG definitions are kept. All
files in file_paths should be under this directory
:type dag_directory: unicode
:param file_paths: list of file paths that contain DAG definitions
:type file_paths: list[unicode]
- :param parallelism: maximum number of simultaneous process to run at
once
- :type parallelism: int
- :param process_file_interval: process a file at most once every this
- many seconds
- :type process_file_interval: float
:param max_runs: The number of times to parse and schedule each file.
-1
for unlimited.
:type max_runs: int
- :type process_file_interval: float
:param processor_factory: function that creates processors for DAG
definition files. Arguments are (dag_definition_path)
- :type processor_factory: (unicode, unicode) ->
(AbstractDagFileProcessor)
-
+ :type processor_factory: (unicode, unicode, list) ->
(AbstractDagFileProcessor)
+ :param signal_conn: connection to communicate signal with processor
agent.
+ :type signal_conn: Connection
+ :param stat_queue: the queue to use for passing back parsing stat to
agent.
+ :type stat_queue: multiprocessing.Queue
+ :param result_queue: the queue to use for passing back the result to
agent.
+ :type result_queue: multiprocessing.Queue
+ :param async_mode: whether to start the manager in async mode
+ :type async_mode: bool
"""
self._file_paths = file_paths
self._file_path_queue = []
- self._parallelism = parallelism
self._dag_directory = dag_directory
self._max_runs = max_runs
- self._process_file_interval = process_file_interval
self._processor_factory = processor_factory
+ self._signal_conn = signal_conn
+ self._stat_queue = stat_queue
+ self._result_queue = result_queue
+ self._async_mode = async_mode
+
+ self._parallelism = conf.getint('scheduler', 'max_threads')
+ if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and
self._parallelism > 1:
+ self.log.error("Cannot use more than 1 thread when using sqlite. "
+ "Setting parallelism to 1")
+ self._parallelism = 1
+
+ # Parse and schedule each file no faster than this interval.
+ self._file_process_interval = conf.getint('scheduler',
+ 'min_file_process_interval')
+ # How often to print out DAG file processing stats to the log. Default
to
+ # 30 seconds.
+ self.print_stats_interval = conf.getint('scheduler',
+ 'print_stats_interval')
+ # How many seconds do we wait for tasks to heartbeat before mark them
as zombies.
+ self._zombie_threshold_secs = (
+ conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
# Map from file path to the processor
self._processors = {}
# Map from file path to the last runtime
self._last_runtime = {}
# Map from file path to the last finish time
self._last_finish_time = {}
+ self._last_zombie_query_time = timezone.utcnow()
+ # Last time that the DAG dir was traversed to look for files
+ self.last_dag_dir_refresh_time = timezone.utcnow()
+ # Last time stats were printed
+ self.last_stat_print_time = timezone.datetime(2000, 1, 1)
+ self._zombie_query_interval = 10
# Map from file path to the number of runs
self._run_count = defaultdict(int)
- # Scheduler heartbeat key.
+ # Manager heartbeat key.
self._heart_beat_key = 'heart-beat'
+ # How often to scan the DAGs directory for new files. Default to 5
minutes.
+ self.dag_dir_list_interval = conf.getint('scheduler',
+ 'dag_dir_list_interval')
+
+ self._log = logging.getLogger('airflow.processor_manager')
+
+ signal.signal(signal.SIGINT, self._exit_gracefully)
+ signal.signal(signal.SIGTERM, self._exit_gracefully)
+
+ def _exit_gracefully(self, signum, frame):
+ """
+ Helper method to clean up DAG file processors to avoid leaving orphan
processes.
+ """
+ self.log.info("Exiting gracefully with signal {}".format(signum))
+ self.terminate()
+ self.end()
+ self.log.debug("Finished terminating DAG processors.")
+ sys.exit(os.EX_OK)
+
+ def start(self):
+ """
+ Use multiple processes to parse and generate tasks for the
+ DAGs in parallel. By processing them in separate processes,
+ we can get parallelism and isolation from potentially harmful
+ user code.
+ :return:
+ """
+
+ self.log.info("Processing files using up to {} processes at a time "
+ .format(self._parallelism))
+ self.log.info("Process each file at most once every {} seconds"
+ .format(self._file_process_interval))
+ self.log.info("Checking for new files in {} every {} seconds"
+ .format(self._dag_directory, self.dag_dir_list_interval))
+
+ if self._async_mode:
+ self.log.debug("Starting DagFileProcessorManager in async mode")
+ self.start_in_async()
+ else:
+ self.log.debug("Starting DagFileProcessorManager in sync mode")
+ self.start_in_sync()
+
+ def start_in_async(self):
+ """
+ Parse DAG files repeatedly in a standalone loop.
+ """
+ while True:
+ loop_start_time = time.time()
+
+ if self._signal_conn.poll():
+ agent_signal = self._signal_conn.recv()
+ if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+ self.terminate()
+ break
+ elif agent_signal == DagParsingSignal.END_MANAGER:
+ self.end()
+ sys.exit(os.EX_OK)
+
+ self._refresh_dag_dir()
+
+ simple_dags = self.heartbeat()
+ for simple_dag in simple_dags:
+ self._result_queue.put(simple_dag)
+
+ self._print_stat()
+
+ all_files_processed = all(self.get_last_finish_time(x) is not None
+ for x in self.file_paths)
+ max_runs_reached = self.max_runs_reached()
+
+ dag_parsing_stat = DagParsingStat(self._file_paths,
+ self.get_all_pids(),
+ max_runs_reached,
+ all_files_processed,
+ len(simple_dags))
+ self._stat_queue.put(dag_parsing_stat)
+
+ if max_runs_reached:
+ self.log.info("Exiting dag parsing loop as all files "
+ "have been processed %s times", self._max_runs)
+ break
+
+ loop_duration = time.time() - loop_start_time
+ if loop_duration < 1:
+ sleep_length = 1 - loop_duration
+ self.log.debug("Sleeping for {0:.2f} seconds "
+ "to prevent excessive
logging".format(sleep_length))
+ time.sleep(sleep_length)
+
+ def start_in_sync(self):
+ """
+ Parse DAG files in a loop controlled by DagParsingSignal.
+ Actual DAG parsing loop will run once everyone an agent heartbeats
+ the manager and will report done when finished the loop.
Review comment:
Minor or maybe I'm wrong: is `"everyone"` here a typo?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services