ashb commented on a change in pull request #3873: [Airflow-2760] Decouple DAG
parsing loop from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219593507
##########
File path: airflow/utils/dag_processing.py
##########
@@ -324,48 +628,322 @@ class DagFileProcessorManager(LoggingMixin):
def __init__(self,
dag_directory,
file_paths,
- parallelism,
- process_file_interval,
max_runs,
- processor_factory):
+ processor_factory,
+ signal_conn,
+ stat_queue,
+ result_queue,
+ async_mode=True):
"""
:param dag_directory: Directory where DAG definitions are kept. All
files in file_paths should be under this directory
:type dag_directory: unicode
:param file_paths: list of file paths that contain DAG definitions
:type file_paths: list[unicode]
- :param parallelism: maximum number of simultaneous process to run at
once
- :type parallelism: int
- :param process_file_interval: process a file at most once every this
- many seconds
- :type process_file_interval: float
:param max_runs: The number of times to parse and schedule each file.
-1
for unlimited.
:type max_runs: int
- :type process_file_interval: float
:param processor_factory: function that creates processors for DAG
definition files. Arguments are (dag_definition_path)
- :type processor_factory: (unicode, unicode) ->
(AbstractDagFileProcessor)
-
+ :type processor_factory: (unicode, unicode, list) ->
(AbstractDagFileProcessor)
+ :param signal_conn: connection to communicate signal with processor
agent.
+ :type signal_conn: Connection
+ :param stat_queue: the queue to use for passing back parsing stat to
agent.
+ :type stat_queue: multiprocessing.Queue
+ :param result_queue: the queue to use for passing back the result to
agent.
+ :type result_queue: multiprocessing.Queue
+ :param async_mode: whether to start the manager in async mode
+ :type async_mode: bool
"""
self._file_paths = file_paths
self._file_path_queue = []
- self._parallelism = parallelism
self._dag_directory = dag_directory
self._max_runs = max_runs
- self._process_file_interval = process_file_interval
self._processor_factory = processor_factory
+ self._signal_conn = signal_conn
+ self._stat_queue = stat_queue
+ self._result_queue = result_queue
+ self._async_mode = async_mode
+
+ self._parallelism = conf.getint('scheduler', 'max_threads')
+ if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and
self._parallelism > 1:
+ self.log.error("Cannot use more than 1 thread when using sqlite. "
+ "Setting parallelism to 1")
+ self._parallelism = 1
+
+ # Parse and schedule each file no faster than this interval.
+ self._file_process_interval = conf.getint('scheduler',
+ 'min_file_process_interval')
+ # How often to print out DAG file processing stats to the log. Default
to
+ # 30 seconds.
+ self.print_stats_interval = conf.getint('scheduler',
+ 'print_stats_interval')
+ # How many seconds do we wait for tasks to heartbeat before mark them
as zombies.
+ self._zombie_threshold_secs = (
+ conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
# Map from file path to the processor
self._processors = {}
# Map from file path to the last runtime
self._last_runtime = {}
# Map from file path to the last finish time
self._last_finish_time = {}
+ self._last_zombie_query_time = timezone.utcnow()
+ # Last time that the DAG dir was traversed to look for files
+ self.last_dag_dir_refresh_time = timezone.utcnow()
+ # Last time stats were printed
+ self.last_stat_print_time = timezone.datetime(2000, 1, 1)
+ self._zombie_query_interval = 10
# Map from file path to the number of runs
self._run_count = defaultdict(int)
- # Scheduler heartbeat key.
+ # Manager heartbeat key.
self._heart_beat_key = 'heart-beat'
+ # How often to scan the DAGs directory for new files. Default to 5
minutes.
+ self.dag_dir_list_interval = conf.getint('scheduler',
+ 'dag_dir_list_interval')
+
+ self._log = logging.getLogger('airflow.processor_manager')
+
+ signal.signal(signal.SIGINT, self._exit_gracefully)
+ signal.signal(signal.SIGTERM, self._exit_gracefully)
+
+ def _exit_gracefully(self, signum, frame):
+ """
+ Helper method to clean up DAG file processors to avoid leaving orphan
processes.
+ """
+ self.log.info("Exiting gracefully with signal {}".format(signum))
Review comment:
Nit: "upon receiving signal" (the same in both cases of this please)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services