[ 
https://issues.apache.org/jira/browse/AIRFLOW-2760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664896#comment-16664896
 ] 

ASF GitHub Bot commented on AIRFLOW-2760:
-----------------------------------------

kaxil closed pull request #3873: [AIRFLOW-2760] Decouple DAG parsing loop from 
scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index cb5c5b7d48..541a394949 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -5,6 +5,16 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### New `dag_processor_manager_log_location` config option
+
+The DAG parsing manager log now by default will be log into a file, where its 
location is
+controlled by the new `dag_processor_manager_log_location` config option in 
core section.
+
+### new `sync_parallelism` config option in celery section
+
+The new `sync_parallelism` config option will control how many processes 
CeleryExecutor will use to
+fetch celery task state in parallel. Default value is max(1, number of cores - 
1)
+
 ### Rename of BashTaskRunner to StandardTaskRunner
 
 BashTaskRunner has been renamed to StandardTaskRunner. It is the default task 
runner
@@ -26,11 +36,6 @@ We also provide a new cli command(``sync_perm``) to allow 
admin to auto sync per
 The scheduler.min_file_parsing_loop_time config option has been temporarily 
removed due to
 some bugs.
 
-### new `sync_parallelism` config option in celery section
-
-The new `sync_parallelism` config option will control how many processes 
CeleryExecutor will use to
-fetch celery task state in parallel. Default value is max(1, number of cores - 
1)
-
 ### CLI Changes
 
 The ability to manipulate users from the command line has been changed. 
'airflow create_user' and 'airflow delete_user' and 'airflow list_users' has 
been grouped to a single command `airflow users` with optional flags 
`--create`, `--list` and `--delete`.
@@ -185,11 +190,11 @@ With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, 
`PROCESSOR_FILENAME_TEMPLATE`, `
 ```
 [core]
 fab_logging_level = WARN
-log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ 
try_number }}.log
-log_processor_filename_template = {{ filename }}.log
+log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts 
}}}}/{{{{ try_number }}}}.log
+log_processor_filename_template = {{{{ filename }}}}.log
 
 [elasticsearch]
-elasticsearch_log_id_template = 
{dag_id}-{task_id}-{execution_date}-{try_number}
+elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
 elasticsearch_end_of_log_mark = end_of_log
 ```
 
diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 95150ab3bb..45a2f2923c 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -20,6 +20,7 @@
 import os
 
 from airflow import configuration as conf
+from airflow.utils.file import mkdirs
 
 # TODO: Logging format and level should be configured
 # in this file instead of from airflow.cfg. Currently
@@ -38,7 +39,11 @@
 
 PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
 
+DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
+    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
+
 FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
+
 PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 
'LOG_PROCESSOR_FILENAME_TEMPLATE')
 
 # Storage bucket url for remote logging
@@ -79,7 +84,7 @@
             'formatter': 'airflow',
             'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
             'filename_template': PROCESSOR_FILENAME_TEMPLATE,
-        },
+        }
     },
     'loggers': {
         'airflow.processor': {
@@ -104,6 +109,26 @@
     }
 }
 
+DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
+    'handlers': {
+        'processor_manager': {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'formatter': 'airflow',
+            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
+            'mode': 'a',
+            'maxBytes': 104857600,  # 100MB
+            'backupCount': 5
+        }
+    },
+    'loggers': {
+        'airflow.processor_manager': {
+            'handlers': ['processor_manager'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        }
+    }
+}
+
 REMOTE_HANDLERS = {
     's3': {
         'task': {
@@ -172,6 +197,22 @@
 
 REMOTE_LOGGING = conf.get('core', 'remote_logging')
 
+# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is 
set.
+# This is to avoid exceptions when initializing RotatingFileHandler multiple 
times
+# in multiple processes.
+if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
+    DEFAULT_LOGGING_CONFIG['handlers'] \
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
+    DEFAULT_LOGGING_CONFIG['loggers'] \
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
+
+    # Manually create log directory for processor_manager handler as 
RotatingFileHandler
+    # will only create file but not the directory.
+    processor_manager_handler_config = 
DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
+        'processor_manager']
+    directory = os.path.dirname(processor_manager_handler_config['filename'])
+    mkdirs(directory, 0o755)
+
 if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
         DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
 elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index b572dbb2f7..c1fda6d00e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -68,6 +68,7 @@ simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
 # Log filename format
 log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts 
}}}}/{{{{ try_number }}}}.log
 log_processor_filename_template = {{{{ filename }}}}.log
+dag_processor_manager_log_location = 
{AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
 
 # Hostname by providing a path to a callable, which will resolve the hostname
 hostname_callable = socket:getfqdn
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index f9279cce54..6baec130b3 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -39,6 +39,7 @@ logging_level = INFO
 fab_logging_level = WARN
 log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts 
}}}}/{{{{ try_number }}}}.log
 log_processor_filename_template = {{{{ filename }}}}.log
+dag_processor_manager_log_location = 
{AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
 executor = SequentialExecutor
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
 load_examples = True
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index a989dc4408..d46b2dc0dc 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -141,7 +141,7 @@ def heartbeat(self):
                                    queue=queue,
                                    executor_config=ti.executor_config)
             else:
-                self.logger.info(
+                self.log.info(
                     'Task is already running, not sending to '
                     'executor: {}'.format(key))
 
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0bcb131c72..9ddf92ff44 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -26,22 +26,18 @@
 import logging
 import multiprocessing
 import os
-import psutil
 import signal
-import six
 import sys
 import threading
 import time
-import datetime
-
 from collections import defaultdict
+from time import sleep
+
+import six
 from past.builtins import basestring
-from sqlalchemy import (
-    Column, Integer, String, func, Index, or_, and_, not_)
+from sqlalchemy import (Column, Index, Integer, String, and_, func, not_, or_)
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
-from tabulate import tabulate
-from time import sleep
 
 from airflow import configuration as conf
 from airflow import executors, models, settings
@@ -53,16 +49,16 @@
 from airflow.utils import asciiart, helpers, timezone
 from airflow.utils.configuration import tmp_configuration_copy
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,
-                                          DagFileProcessorManager,
+                                          DagFileProcessorAgent,
                                           SimpleDag,
                                           SimpleDagBag,
                                           list_py_file_paths)
 from airflow.utils.db import create_session, provide_session
-from airflow.utils.email import send_email, get_email_address_list
-from airflow.utils.log.logging_mixin import LoggingMixin, set_context, 
StreamLogWriter
+from airflow.utils.email import get_email_address_list, send_email
+from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, 
set_context
 from airflow.utils.net import get_hostname
-from airflow.utils.state import State
 from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.state import State
 
 Base = models.Base
 ID_LEN = models.ID_LEN
@@ -304,7 +300,7 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
     # Counter that increments everytime an instance of this class is created
     class_creation_counter = 0
 
-    def __init__(self, file_path, pickle_dags, dag_id_white_list):
+    def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
         """
         :param file_path: a Python file containing Airflow DAG definitions
         :type file_path: unicode
@@ -312,6 +308,8 @@ def __init__(self, file_path, pickle_dags, 
dag_id_white_list):
         :type pickle_dags: bool
         :param dag_id_whitelist: If specified, only look at these DAG ID's
         :type dag_id_whitelist: list[unicode]
+        :param zombies: zombie task instances to kill
+        :type zombies: list[SimpleTaskInstance]
         """
         self._file_path = file_path
         # Queue that's used to pass results from the child process.
@@ -320,6 +318,7 @@ def __init__(self, file_path, pickle_dags, 
dag_id_white_list):
         self._process = None
         self._dag_id_white_list = dag_id_white_list
         self._pickle_dags = pickle_dags
+        self._zombies = zombies
         # The result of Scheduler.process_file(file_path).
         self._result = None
         # Whether the process is done running.
@@ -340,7 +339,8 @@ def _launch_process(result_queue,
                         file_path,
                         pickle_dags,
                         dag_id_white_list,
-                        thread_name):
+                        thread_name,
+                        zombies):
         """
         Launch a process to process the given file.
 
@@ -358,6 +358,8 @@ def _launch_process(result_queue,
         :type thread_name: unicode
         :return: the process that was launched
         :rtype: multiprocessing.Process
+        :param zombies: zombie task instances to kill
+        :type zombies: list[SimpleTaskInstance]
         """
         def helper():
             # This helper runs in the newly created process
@@ -386,6 +388,7 @@ def helper():
                          os.getpid(), file_path)
                 scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, 
log=log)
                 result = scheduler_job.process_file(file_path,
+                                                    zombies,
                                                     pickle_dags)
                 result_queue.put(result)
                 end_time = time.time()
@@ -418,7 +421,8 @@ def start(self):
             self.file_path,
             self._pickle_dags,
             self._dag_id_white_list,
-            "DagFileProcessor{}".format(self._instance_id))
+            "DagFileProcessor{}".format(self._instance_id),
+            self._zombies)
         self._start_time = timezone.utcnow()
 
     def terminate(self, sigkill=False):
@@ -475,7 +479,8 @@ def done(self):
         if self._done:
             return True
 
-        if not self._result_queue.empty():
+        # In case result queue is corrupted.
+        if self._result_queue and not self._result_queue.empty():
             self._result = self._result_queue.get_nowait()
             self._done = True
             self.log.debug("Waiting for %s", self._process)
@@ -483,7 +488,7 @@ def done(self):
             return True
 
         # Potential error case when process dies
-        if not self._process.is_alive():
+        if self._result_queue and not self._process.is_alive():
             self._done = True
             # Get the object from the queue or else join() can hang.
             if not self._result_queue.empty():
@@ -534,8 +539,6 @@ def __init__(
             dag_ids=None,
             subdir=settings.DAGS_FOLDER,
             num_runs=-1,
-            file_process_interval=conf.getint('scheduler',
-                                              'min_file_process_interval'),
             processor_poll_interval=1.0,
             run_duration=None,
             do_pickle=False,
@@ -584,26 +587,27 @@ def __init__(
 
         self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
-            if self.max_threads > 1:
-                self.log.error("Cannot use more than 1 thread when using 
sqlite. Setting max_threads to 1")
-            self.max_threads = 1
             self.using_sqlite = True
 
-        # How often to scan the DAGs directory for new files. Default to 5 
minutes.
-        self.dag_dir_list_interval = conf.getint('scheduler',
-                                                 'dag_dir_list_interval')
-        # How often to print out DAG file processing stats to the log. Default 
to
-        # 30 seconds.
-        self.print_stats_interval = conf.getint('scheduler',
-                                                'print_stats_interval')
-
-        self.file_process_interval = file_process_interval
-
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         if run_duration is None:
             self.run_duration = conf.getint('scheduler',
                                             'run_duration')
 
+        self.processor_agent = None
+
+        signal.signal(signal.SIGINT, self._exit_gracefully)
+        signal.signal(signal.SIGTERM, self._exit_gracefully)
+
+    def _exit_gracefully(self, signum, frame):
+        """
+        Helper method to clean up processor_agent to avoid leaving orphan 
processes.
+        """
+        self.log.info("Exiting gracefully upon receiving signal 
{}".format(signum))
+        if self.processor_agent:
+            self.processor_agent.end()
+        sys.exit(os.EX_OK)
+
     @provide_session
     def manage_slas(self, dag, session=None):
         """
@@ -739,25 +743,6 @@ def manage_slas(self, dag, session=None):
                     session.merge(sla)
             session.commit()
 
-    @staticmethod
-    @provide_session
-    def clear_nonexistent_import_errors(session, known_file_paths):
-        """
-        Clears import errors for files that no longer exist.
-
-        :param session: session for ORM operations
-        :type session: sqlalchemy.orm.session.Session
-        :param known_file_paths: The list of existing files that are parsed 
for DAGs
-        :type known_file_paths: list[unicode]
-        """
-        query = session.query(models.ImportError)
-        if known_file_paths:
-            query = query.filter(
-                ~models.ImportError.filename.in_(known_file_paths)
-            )
-        query.delete(synchronize_session='fetch')
-        session.commit()
-
     @staticmethod
     def update_import_errors(session, dagbag):
         """
@@ -1112,7 +1097,9 @@ def _find_executable_task_instances(self, simple_dag_bag, 
states, session=None):
         # Put one task instance on each line
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in task_instances_to_examine])
-        self.log.info("Tasks up for execution:\n\t%s", task_instance_str)
+        self.log.info("{} tasks up for execution:\n\t{}"
+                      .format(len(task_instances_to_examine),
+                              task_instance_str))
 
         # Get the pool settings
         pools = {p.pool: p for p in session.query(models.Pool).all()}
@@ -1303,8 +1290,9 @@ def query(result, items):
                 .all())
             task_instance_str = "\n\t".join(
                 ["{}".format(x) for x in tis_to_be_queued])
-            self.log.info("Setting the follow tasks to queued state:\n\t%s",
-                          task_instance_str)
+            self.log.info("Setting the following {} tasks to queued 
state:\n\t{}"
+                          .format(len(tis_to_be_queued),
+                                  task_instance_str))
             return result + tis_to_be_queued
 
         tis_to_be_queued = helpers.reduce_in_chunks(query,
@@ -1481,72 +1469,6 @@ def _process_executor_events(self, simple_dag_bag, 
session=None):
                         session.merge(ti)
                         session.commit()
 
-    def _log_file_processing_stats(self,
-                                   known_file_paths,
-                                   processor_manager):
-        """
-        Print out stats about how files are getting processed.
-
-        :param known_file_paths: a list of file paths that may contain Airflow
-            DAG definitions
-        :type known_file_paths: list[unicode]
-        :param processor_manager: manager for the file processors
-        :type stats: DagFileProcessorManager
-        :return: None
-        """
-
-        # File Path: Path to the file containing the DAG definition
-        # PID: PID associated with the process that's processing the file. May
-        # be empty.
-        # Runtime: If the process is currently running, how long it's been
-        # running for in seconds.
-        # Last Runtime: If the process ran before, how long did it take to
-        # finish in seconds
-        # Last Run: When the file finished processing in the previous run.
-        headers = ["File Path",
-                   "PID",
-                   "Runtime",
-                   "Last Runtime",
-                   "Last Run"]
-
-        rows = []
-        for file_path in known_file_paths:
-            last_runtime = processor_manager.get_last_runtime(file_path)
-            processor_pid = processor_manager.get_pid(file_path)
-            processor_start_time = processor_manager.get_start_time(file_path)
-            runtime = ((timezone.utcnow() - 
processor_start_time).total_seconds()
-                       if processor_start_time else None)
-            last_run = processor_manager.get_last_finish_time(file_path)
-
-            rows.append((file_path,
-                         processor_pid,
-                         runtime,
-                         last_runtime,
-                         last_run))
-
-        # Sort by longest last runtime. (Can't sort None values in python3)
-        rows = sorted(rows, key=lambda x: x[3] or 0.0)
-
-        formatted_rows = []
-        for file_path, pid, runtime, last_runtime, last_run in rows:
-            formatted_rows.append((file_path,
-                                   pid,
-                                   "{:.2f}s".format(runtime)
-                                   if runtime else None,
-                                   "{:.2f}s".format(last_runtime)
-                                   if last_runtime else None,
-                                   last_run.strftime("%Y-%m-%dT%H:%M:%S")
-                                   if last_run else None))
-        log_str = ("\n" +
-                   "=" * 80 +
-                   "\n" +
-                   "DAG File Processing Stats\n\n" +
-                   tabulate(formatted_rows, headers=headers) +
-                   "\n" +
-                   "=" * 80)
-
-        self.log.info(log_str)
-
     def _execute(self):
         self.log.info("Starting the scheduler")
 
@@ -1556,84 +1478,51 @@ def _execute(self):
                 (executors.LocalExecutor, executors.SequentialExecutor):
             pickle_dags = True
 
-        # Use multiple processes to parse and generate tasks for the
-        # DAGs in parallel. By processing them in separate processes,
-        # we can get parallelism and isolation from potentially harmful
-        # user code.
-        self.log.info(
-            "Processing files using up to %s processes at a time",
-            self.max_threads)
         self.log.info("Running execute loop for %s seconds", self.run_duration)
         self.log.info("Processing each file at most %s times", self.num_runs)
-        self.log.info(
-            "Process each file at most once every %s seconds",
-            self.file_process_interval)
-        self.log.info(
-            "Checking for new files in %s every %s seconds",
-            self.subdir,
-            self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
         self.log.info("Searching for files in %s", self.subdir)
         known_file_paths = list_py_file_paths(self.subdir)
         self.log.info("There are %s files in %s", len(known_file_paths), 
self.subdir)
 
-        def processor_factory(file_path):
+        def processor_factory(file_path, zombies):
             return DagFileProcessor(file_path,
                                     pickle_dags,
-                                    self.dag_ids)
+                                    self.dag_ids,
+                                    zombies)
+
+        # When using sqlite, we do not use async_mode
+        # so the scheduler job and DAG parser don't access the DB at the same 
time.
+        async_mode = not self.using_sqlite
 
-        processor_manager = DagFileProcessorManager(self.subdir,
-                                                    known_file_paths,
-                                                    self.max_threads,
-                                                    self.file_process_interval,
-                                                    self.num_runs,
-                                                    processor_factory)
+        self.processor_agent = DagFileProcessorAgent(self.subdir,
+                                                     known_file_paths,
+                                                     self.num_runs,
+                                                     processor_factory,
+                                                     async_mode)
 
         try:
-            self._execute_helper(processor_manager)
+            self._execute_helper()
         finally:
+            self.processor_agent.end()
             self.log.info("Exited execute loop")
 
-            # Kill all child processes on exit since we don't want to leave
-            # them as orphaned.
-            pids_to_kill = processor_manager.get_all_pids()
-            if len(pids_to_kill) > 0:
-                # First try SIGTERM
-                this_process = psutil.Process(os.getpid())
-                # Only check child processes to ensure that we don't have a 
case
-                # where we kill the wrong process because a child process died
-                # but the PID got reused.
-                child_processes = [x for x in 
this_process.children(recursive=True)
-                                   if x.is_running() and x.pid in pids_to_kill]
-                for child in child_processes:
-                    self.log.info("Terminating child PID: %s", child.pid)
-                    child.terminate()
-                # TODO: Remove magic number
-                timeout = 5
-                self.log.info(
-                    "Waiting up to %s seconds for processes to exit...", 
timeout)
-                try:
-                    psutil.wait_procs(
-                        child_processes, timeout=timeout,
-                        callback=lambda x: self.log.info('Terminated PID %s', 
x.pid))
-                except psutil.TimeoutExpired:
-                    self.log.debug("Ran out of time while waiting for 
processes to exit")
-
-                # Then SIGKILL
-                child_processes = [x for x in 
this_process.children(recursive=True)
-                                   if x.is_running() and x.pid in pids_to_kill]
-                if len(child_processes) > 0:
-                    self.log.info("SIGKILL processes that did not terminate 
gracefully")
-                    for child in child_processes:
-                        self.log.info("Killing child PID: %s", child.pid)
-                        child.kill()
-                        child.wait()
-
-    def _execute_helper(self, processor_manager):
-        """
-        :param processor_manager: manager to use
-        :type processor_manager: DagFileProcessorManager
+    def _execute_helper(self):
+        """
+        The actual scheduler loop. The main steps in the loop are:
+            #. Harvest DAG parsing results through DagFileProcessorAgent
+            #. Find and queue executable tasks
+                #. Change task instance state in DB
+                #. Queue tasks in executor
+            #. Heartbeat executor
+                #. Execute queued tasks in executor asynchronously
+                #. Sync on the states of running tasks
+
+        Following is a graphic representation of these steps.
+
+        .. image:: ../docs/img/scheduler_loop.jpg
+
         :return: None
         """
         self.executor.start()
@@ -1641,17 +1530,13 @@ def _execute_helper(self, processor_manager):
         self.log.info("Resetting orphaned tasks for active dag runs")
         self.reset_state_for_orphaned_tasks()
 
+        # Start after resetting orphaned tasks to avoid stressing out DB.
+        self.processor_agent.start()
+
         execute_start_time = timezone.utcnow()
 
-        # Last time stats were printed
-        last_stat_print_time = datetime.datetime(2000, 1, 1, 
tzinfo=timezone.utc)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = timezone.utcnow()
-        # Last time that the DAG dir was traversed to look for files
-        last_dag_dir_refresh_time = timezone.utcnow()
-
-        # Use this value initially
-        known_file_paths = processor_manager.file_paths
 
         # For the execute duration, parse and schedule DAGs
         while (timezone.utcnow() - execute_start_time).total_seconds() < \
@@ -1659,60 +1544,47 @@ def _execute_helper(self, processor_manager):
             self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
-            # Traverse the DAG directory for Python files containing DAGs
-            # periodically
-            elapsed_time_since_refresh = (timezone.utcnow() -
-                                          
last_dag_dir_refresh_time).total_seconds()
-
-            if elapsed_time_since_refresh > self.dag_dir_list_interval:
-                # Build up a list of Python files that could contain DAGs
-                self.log.info("Searching for files in %s", self.subdir)
-                known_file_paths = list_py_file_paths(self.subdir)
-                last_dag_dir_refresh_time = timezone.utcnow()
-                self.log.info(
-                    "There are %s files in %s", len(known_file_paths), 
self.subdir)
-
-                processor_manager.set_file_paths(known_file_paths)
-
-                self.log.debug("Removing old import errors")
-                
self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
-
-            # Kick of new processes and collect results from finished ones
-            self.log.debug("Heartbeating the process manager")
-            simple_dags = processor_manager.heartbeat()
-
             if self.using_sqlite:
+                self.processor_agent.heartbeat()
                 # For the sqlite case w/ 1 thread, wait until the processor
                 # is finished to avoid concurrent access to the DB.
                 self.log.debug(
                     "Waiting for processors to finish since we're using 
sqlite")
+                self.processor_agent.wait_until_finished()
 
-                processor_manager.wait_until_finished()
+            self.log.info("Harvesting DAG parsing results")
+            simple_dags = self.processor_agent.harvest_simple_dags()
 
             # Send tasks for execution if available
             simple_dag_bag = SimpleDagBag(simple_dags)
             if len(simple_dags) > 0:
-
-                # Handle cases where a DAG run state is set (perhaps manually) 
to
-                # a non-running state. Handle task instances that belong to
-                # DAG runs in those states
-
-                # If a task instance is up for retry but the corresponding DAG 
run
-                # isn't running, mark the task instance as FAILED so we don't 
try
-                # to re-run it.
-                self._change_state_for_tis_without_dagrun(simple_dag_bag,
-                                                          [State.UP_FOR_RETRY],
-                                                          State.FAILED)
-                # If a task instance is scheduled or queued, but the 
corresponding
-                # DAG run isn't running, set the state to NONE so we don't try 
to
-                # re-run it.
-                self._change_state_for_tis_without_dagrun(simple_dag_bag,
-                                                          [State.QUEUED,
-                                                           State.SCHEDULED],
-                                                          State.NONE)
-
-                self._execute_task_instances(simple_dag_bag,
-                                             (State.SCHEDULED,))
+                try:
+                    simple_dag_bag = SimpleDagBag(simple_dags)
+
+                    # Handle cases where a DAG run state is set (perhaps 
manually) to
+                    # a non-running state. Handle task instances that belong to
+                    # DAG runs in those states
+
+                    # If a task instance is up for retry but the corresponding 
DAG run
+                    # isn't running, mark the task instance as FAILED so we 
don't try
+                    # to re-run it.
+                    self._change_state_for_tis_without_dagrun(simple_dag_bag,
+                                                              
[State.UP_FOR_RETRY],
+                                                              State.FAILED)
+                    # If a task instance is scheduled or queued, but the 
corresponding
+                    # DAG run isn't running, set the state to NONE so we don't 
try to
+                    # re-run it.
+                    self._change_state_for_tis_without_dagrun(simple_dag_bag,
+                                                              [State.QUEUED,
+                                                               
State.SCHEDULED],
+                                                              State.NONE)
+
+                    self._execute_task_instances(simple_dag_bag,
+                                                 (State.SCHEDULED,))
+                except Exception as e:
+                    self.log.error("Error queuing tasks")
+                    self.log.exception(e)
+                    continue
 
             # Call heartbeats
             self.log.debug("Heartbeating the executor")
@@ -1729,40 +1601,34 @@ def _execute_helper(self, processor_manager):
                 self.heartbeat()
                 last_self_heartbeat_time = timezone.utcnow()
 
-            # Occasionally print out stats about how fast the files are 
getting processed
-            if ((timezone.utcnow() - last_stat_print_time).total_seconds() >
-                    self.print_stats_interval):
-                if len(known_file_paths) > 0:
-                    self._log_file_processing_stats(known_file_paths,
-                                                    processor_manager)
-                last_stat_print_time = timezone.utcnow()
-
             loop_end_time = time.time()
+            loop_duration = loop_end_time - loop_start_time
             self.log.debug(
                 "Ran scheduling loop in %.2f seconds",
-                loop_end_time - loop_start_time)
+                loop_duration)
             self.log.debug("Sleeping for %.2f seconds", 
self._processor_poll_interval)
             time.sleep(self._processor_poll_interval)
 
             # Exit early for a test mode
-            if processor_manager.max_runs_reached():
-                self.log.info(
-                    "Exiting loop as all files have been processed %s times",
-                    self.num_runs)
+            if self.processor_agent.done:
+                self.log.info("Exiting scheduler loop as all files"
+                              " have been processed {} 
times".format(self.num_runs))
                 break
 
+            if loop_duration < 1:
+                sleep_length = 1 - loop_duration
+                self.log.debug(
+                    "Sleeping for {0:.2f} seconds to prevent excessive logging"
+                    .format(sleep_length))
+                sleep(sleep_length)
+
         # Stop any processors
-        processor_manager.terminate()
+        self.processor_agent.terminate()
 
         # Verify that all files were processed, and if so, deactivate DAGs that
         # haven't been touched by the scheduler as they likely have been
         # deleted.
-        all_files_processed = True
-        for file_path in known_file_paths:
-            if processor_manager.get_last_finish_time(file_path) is None:
-                all_files_processed = False
-                break
-        if all_files_processed:
+        if self.processor_agent.all_files_processed:
             self.log.info(
                 "Deactivating DAGs that haven't been touched since %s",
                 execute_start_time.isoformat()
@@ -1774,7 +1640,7 @@ def _execute_helper(self, processor_manager):
         settings.Session.remove()
 
     @provide_session
-    def process_file(self, file_path, pickle_dags=False, session=None):
+    def process_file(self, file_path, zombies, pickle_dags=False, 
session=None):
         """
         Process a Python file containing Airflow DAGs.
 
@@ -1793,6 +1659,8 @@ def process_file(self, file_path, pickle_dags=False, 
session=None):
 
         :param file_path: the path to the Python file that should be executed
         :type file_path: unicode
+        :param zombies: zombie task instances to kill.
+        :type zombies: list[SimpleTaskInstance]
         :param pickle_dags: whether serialize the DAGs found in the file and
             save them to the db
         :type pickle_dags: bool
@@ -1887,7 +1755,7 @@ def process_file(self, file_path, pickle_dags=False, 
session=None):
         except Exception:
             self.log.exception("Error logging import errors!")
         try:
-            dagbag.kill_zombies()
+            dagbag.kill_zombies(zombies)
         except Exception:
             self.log.exception("Error killing zombies!")
 
diff --git a/airflow/models.py b/airflow/models.py
index 6cdfc0fd81..c755268a43 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -444,39 +444,28 @@ def process_file(self, filepath, only_if_updated=True, 
safe_mode=True):
         return found_dags
 
     @provide_session
-    def kill_zombies(self, session=None):
-        """
-        Fails tasks that haven't had a heartbeat in too long
-        """
-        from airflow.jobs import LocalTaskJob as LJ
-        self.log.info("Finding 'running' jobs without a recent heartbeat")
-        TI = TaskInstance
-        secs = configuration.conf.getint('scheduler', 
'scheduler_zombie_task_threshold')
-        limit_dttm = timezone.utcnow() - timedelta(seconds=secs)
-        self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
-
-        tis = (
-            session.query(TI)
-            .join(LJ, TI.job_id == LJ.id)
-            .filter(TI.state == State.RUNNING)
-            .filter(
-                or_(
-                    LJ.state != State.RUNNING,
-                    LJ.latest_heartbeat < limit_dttm,
-                ))
-            .all()
-        )
-
-        for ti in tis:
-            if ti and ti.dag_id in self.dags:
-                dag = self.dags[ti.dag_id]
-                if ti.task_id in dag.task_ids:
-                    task = dag.get_task(ti.task_id)
-
-                    # now set non db backed vars on ti
-                    ti.task = task
+    def kill_zombies(self, zombies, session=None):
+        """
+        Fail given zombie tasks, which are tasks that haven't
+        had a heartbeat for too long, in the current DagBag.
+
+        :param zombies: zombie task instances to kill.
+        :type zombies: SimpleTaskInstance
+        :param session: DB session.
+        :type Session.
+        """
+        for zombie in zombies:
+            if zombie.dag_id in self.dags:
+                dag = self.dags[zombie.dag_id]
+                if zombie.task_id in dag.task_ids:
+                    task = dag.get_task(zombie.task_id)
+                    ti = TaskInstance(task, zombie.execution_date)
+                    # Get properties needed for failure handling from 
SimpleTaskInstance.
+                    ti.start_date = zombie.start_date
+                    ti.end_date = zombie.end_date
+                    ti.try_number = zombie.try_number
+                    ti.state = zombie.state
                     ti.test_mode = configuration.getboolean('core', 
'unit_test_mode')
-
                     ti.handle_failure("{} detected as zombie".format(ti),
                                       ti.test_mode, ti.get_template_context())
                     self.log.info(
diff --git a/airflow/operators/bash_operator.py 
b/airflow/operators/bash_operator.py
index 941070bdf4..f9c2e1bbc0 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -47,7 +47,7 @@ class BashOperator(BaseOperator):
         of inheriting the current process environment, which is the default
         behavior. (templated)
     :type env: dict
-    :param output_encoding: Output encoding of bash command 
+    :param output_encoding: Output encoding of bash command
     :type output_encoding: str
     """
     template_fields = ('bash_command', 'env')
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 8b77c796d2..47f473e9aa 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -22,17 +22,33 @@
 from __future__ import print_function
 from __future__ import unicode_literals
 
+import logging
+import multiprocessing
 import os
 import re
+import signal
+import sys
 import time
 import zipfile
 from abc import ABCMeta, abstractmethod
 from collections import defaultdict
+from collections import namedtuple
+from datetime import timedelta
 
+import psutil
+from six.moves import range, reload_module
+from sqlalchemy import or_
+from tabulate import tabulate
+
+# To avoid circular imports
+import airflow.models
+from airflow import configuration as conf
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
 from airflow.utils import timezone
+from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
 
 
 class SimpleDag(BaseDag):
@@ -121,6 +137,45 @@ def get_task_special_arg(self, task_id, special_arg_name):
             return None
 
 
+class SimpleTaskInstance(object):
+    def __init__(self, ti):
+        self._dag_id = ti.dag_id
+        self._task_id = ti.task_id
+        self._execution_date = ti.execution_date
+        self._start_date = ti.start_date
+        self._end_date = ti.end_date
+        self._try_number = ti.try_number
+        self._state = ti.state
+
+    @property
+    def dag_id(self):
+        return self._dag_id
+
+    @property
+    def task_id(self):
+        return self._task_id
+
+    @property
+    def execution_date(self):
+        return self._execution_date
+
+    @property
+    def start_date(self):
+        return self._start_date
+
+    @property
+    def end_date(self):
+        return self._end_date
+
+    @property
+    def try_number(self):
+        return self._try_number
+
+    @property
+    def state(self):
+        return self._state
+
+
 class SimpleDagBag(BaseDagBag):
     """
     A collection of SimpleDag objects with some convenience methods.
@@ -308,10 +363,243 @@ def file_path(self):
         raise NotImplementedError()
 
 
+DagParsingStat = namedtuple('DagParsingStat',
+                            ['file_paths', 'all_pids', 'done',
+                             'all_files_processed', 'result_count'])
+
+
+DagParsingSignal = namedtuple(
+    'DagParsingSignal',
+    ['AGENT_HEARTBEAT', 'MANAGER_DONE', 'TERMINATE_MANAGER', 'END_MANAGER'])(
+    'agent_heartbeat', 'manager_done', 'terminate_manager', 'end_manager')
+
+
+class DagFileProcessorAgent(LoggingMixin):
+    """
+    Agent for DAG file processing. It is responsible for all DAG parsing
+    related jobs in scheduler process. Mainly it can spin up 
DagFileProcessorManager
+    in a subprocess, collect DAG parsing results from it and communicate
+    signal/DAG parsing stat with it.
+    """
+
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 max_runs,
+                 processor_factory,
+                 async_mode):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param max_runs: The number of times to parse and schedule each file. 
-1
+        for unlimited.
+        :type max_runs: int
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode, list) -> 
(AbstractDagFileProcessor)
+        :param async_mode: Whether to start agent in async mode
+        :type async_mode: bool
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._processor_factory = processor_factory
+        self._async_mode = async_mode
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+        # Pids of DAG parse
+        self._all_pids = []
+        # Pipe for communicating signals
+        self._parent_signal_conn, self._child_signal_conn = 
multiprocessing.Pipe()
+        # Pipe for communicating DagParsingStat
+        self._stat_queue = multiprocessing.Queue()
+        self._result_queue = multiprocessing.Queue()
+        self._process = None
+        self._done = False
+        # Initialized as true so we do not deactivate w/o any actual DAG 
parsing.
+        self._all_files_processed = True
+        self._result_count = 0
+
+    def start(self):
+        """
+        Launch DagFileProcessorManager processor and start DAG parsing loop in 
manager.
+        """
+        self._process = self._launch_process(self._dag_directory,
+                                             self._file_paths,
+                                             self._max_runs,
+                                             self._processor_factory,
+                                             self._child_signal_conn,
+                                             self._stat_queue,
+                                             self._result_queue,
+                                             self._async_mode)
+        self.log.info("Launched DagFileProcessorManager with pid: {}"
+                      .format(self._process.pid))
+
+    def heartbeat(self):
+        """
+        Should only be used when launched DAG file processor manager in sync 
mode.
+        Send agent heartbeat signal to the manager.
+        """
+        self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT)
+
+    def wait_until_finished(self):
+        """
+        Should only be used when launched DAG file processor manager in sync 
mode.
+        Wait for done signal from the manager.
+        """
+        while True:
+            if self._parent_signal_conn.recv() == 
DagParsingSignal.MANAGER_DONE:
+                break
+
+    @staticmethod
+    def _launch_process(dag_directory,
+                        file_paths,
+                        max_runs,
+                        processor_factory,
+                        signal_conn,
+                        _stat_queue,
+                        result_queue,
+                        async_mode):
+        def helper():
+            # Reload configurations and settings to avoid collision with 
parent process.
+            # Because this process may need custom configurations that cannot 
be shared,
+            # e.g. RotatingFileHandler. And it can cause connection corruption 
if we
+            # do not recreate the SQLA connection pool.
+            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+            reload_module(airflow.config_templates.airflow_local_settings)
+            reload_module(airflow.settings)
+            del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+            processor_manager = DagFileProcessorManager(dag_directory,
+                                                        file_paths,
+                                                        max_runs,
+                                                        processor_factory,
+                                                        signal_conn,
+                                                        _stat_queue,
+                                                        result_queue,
+                                                        async_mode)
+
+            processor_manager.start()
+
+        p = multiprocessing.Process(target=helper,
+                                    args=(),
+                                    name="DagFileProcessorManager")
+        p.start()
+        return p
+
+    def harvest_simple_dags(self):
+        """
+        Harvest DAG parsing results from result queue and sync metadata from 
stat queue.
+        :return: List of parsing result in SimpleDag format.
+        """
+        # Metadata and results to be harvested can be inconsistent,
+        # but it should not be a big problem.
+        self._sync_metadata()
+        # Heartbeating after syncing metadata so we do not restart manager
+        # if it processed all files for max_run times and exit normally.
+        self._heartbeat_manager()
+        simple_dags = []
+        # multiprocessing.Queue().qsize will not work on MacOS.
+        if sys.platform == "darwin":
+            qsize = self._result_count
+        else:
+            qsize = self._result_queue.qsize()
+        for _ in range(qsize):
+            simple_dags.append(self._result_queue.get())
+
+        self._result_count = 0
+
+        return simple_dags
+
+    def _heartbeat_manager(self):
+        """
+        Heartbeat DAG file processor and start it if it is not alive.
+        :return:
+        """
+        if self._process and not self._process.is_alive() and not self.done:
+            self.start()
+
+    def _sync_metadata(self):
+        """
+        Sync metadata from stat queue and only keep the latest stat.
+        :return:
+        """
+        while not self._stat_queue.empty():
+            stat = self._stat_queue.get()
+            self._file_paths = stat.file_paths
+            self._all_pids = stat.all_pids
+            self._done = stat.done
+            self._all_files_processed = stat.all_files_processed
+            self._result_count += stat.result_count
+
+    @property
+    def file_paths(self):
+        return self._file_paths
+
+    @property
+    def done(self):
+        return self._done
+
+    @property
+    def all_files_processed(self):
+        return self._all_files_processed
+
+    def terminate(self):
+        """
+        Send termination signal to DAG parsing processor manager
+        and expect it to terminate all DAG file processors.
+        """
+        self.log.info("Sending termination message to manager.")
+        self._child_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
+
+    def end(self):
+        """
+        Terminate (and then kill) the manager process launched.
+        :return:
+        """
+        if not self._process or not self._process.is_alive():
+            self.log.warn('Ending without manager process.')
+            return
+        this_process = psutil.Process(os.getpid())
+        manager_process = psutil.Process(self._process.pid)
+        # First try SIGTERM
+        if manager_process.is_running() \
+                and manager_process.pid in [x.pid for x in 
this_process.children()]:
+            self.log.info(
+                "Terminating manager process: {}".format(manager_process.pid))
+            manager_process.terminate()
+            # TODO: Remove magic number
+            timeout = 5
+            self.log.info("Waiting up to {}s for manager process to exit..."
+                          .format(timeout))
+            try:
+                psutil.wait_procs({manager_process}, timeout)
+            except psutil.TimeoutExpired:
+                self.log.debug("Ran out of time while waiting for "
+                               "processes to exit")
+
+        # Then SIGKILL
+        if manager_process.is_running() \
+                and manager_process.pid in [x.pid for x in 
this_process.children()]:
+            self.log.info("Killing manager process: 
{}".format(manager_process.pid))
+            manager_process.kill()
+            manager_process.wait()
+
+
 class DagFileProcessorManager(LoggingMixin):
     """
     Given a list of DAG definition files, this kicks off several processors
-    in parallel to process them. The parallelism is limited and as the
+    in parallel to process them and put the results to a multiprocessing.Queue
+    for DagFileProcessorAgent to harvest. The parallelism is limited and as the
     processors finish, more are launched. The files are processed over and
     over again, but no more often than the specified interval.
 
@@ -324,48 +612,320 @@ class DagFileProcessorManager(LoggingMixin):
     def __init__(self,
                  dag_directory,
                  file_paths,
-                 parallelism,
-                 process_file_interval,
                  max_runs,
-                 processor_factory):
+                 processor_factory,
+                 signal_conn,
+                 stat_queue,
+                 result_queue,
+                 async_mode=True):
         """
         :param dag_directory: Directory where DAG definitions are kept. All
         files in file_paths should be under this directory
         :type dag_directory: unicode
         :param file_paths: list of file paths that contain DAG definitions
         :type file_paths: list[unicode]
-        :param parallelism: maximum number of simultaneous process to run at 
once
-        :type parallelism: int
-        :param process_file_interval: process a file at most once every this
-        many seconds
-        :type process_file_interval: float
         :param max_runs: The number of times to parse and schedule each file. 
-1
         for unlimited.
         :type max_runs: int
-        :type process_file_interval: float
         :param processor_factory: function that creates processors for DAG
         definition files. Arguments are (dag_definition_path)
-        :type processor_factory: (unicode, unicode) -> 
(AbstractDagFileProcessor)
-
+        :type processor_factory: (unicode, unicode, list) -> 
(AbstractDagFileProcessor)
+        :param signal_conn: connection to communicate signal with processor 
agent.
+        :type signal_conn: Connection
+        :param stat_queue: the queue to use for passing back parsing stat to 
agent.
+        :type stat_queue: multiprocessing.Queue
+        :param result_queue: the queue to use for passing back the result to 
agent.
+        :type result_queue: multiprocessing.Queue
+        :param async_mode: whether to start the manager in async mode
+        :type async_mode: bool
         """
         self._file_paths = file_paths
         self._file_path_queue = []
-        self._parallelism = parallelism
         self._dag_directory = dag_directory
         self._max_runs = max_runs
-        self._process_file_interval = process_file_interval
         self._processor_factory = processor_factory
+        self._signal_conn = signal_conn
+        self._stat_queue = stat_queue
+        self._result_queue = result_queue
+        self._async_mode = async_mode
+
+        self._parallelism = conf.getint('scheduler', 'max_threads')
+        if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and 
self._parallelism > 1:
+            self.log.error("Cannot use more than 1 thread when using sqlite. "
+                           "Setting parallelism to 1")
+            self._parallelism = 1
+
+        # Parse and schedule each file no faster than this interval.
+        self._file_process_interval = conf.getint('scheduler',
+                                                  'min_file_process_interval')
+        # How often to print out DAG file processing stats to the log. Default 
to
+        # 30 seconds.
+        self.print_stats_interval = conf.getint('scheduler',
+                                                'print_stats_interval')
+        # How many seconds do we wait for tasks to heartbeat before mark them 
as zombies.
+        self._zombie_threshold_secs = (
+            conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
         # Map from file path to the processor
         self._processors = {}
         # Map from file path to the last runtime
         self._last_runtime = {}
         # Map from file path to the last finish time
         self._last_finish_time = {}
+        self._last_zombie_query_time = timezone.utcnow()
+        # Last time that the DAG dir was traversed to look for files
+        self.last_dag_dir_refresh_time = timezone.utcnow()
+        # Last time stats were printed
+        self.last_stat_print_time = timezone.datetime(2000, 1, 1)
+        # TODO: Remove magic number
+        self._zombie_query_interval = 10
         # Map from file path to the number of runs
         self._run_count = defaultdict(int)
-        # Scheduler heartbeat key.
+        # Manager heartbeat key.
         self._heart_beat_key = 'heart-beat'
 
+        # How often to scan the DAGs directory for new files. Default to 5 
minutes.
+        self.dag_dir_list_interval = conf.getint('scheduler',
+                                                 'dag_dir_list_interval')
+
+        self._log = logging.getLogger('airflow.processor_manager')
+
+        signal.signal(signal.SIGINT, self._exit_gracefully)
+        signal.signal(signal.SIGTERM, self._exit_gracefully)
+
+    def _exit_gracefully(self, signum, frame):
+        """
+        Helper method to clean up DAG file processors to avoid leaving orphan 
processes.
+        """
+        self.log.info("Exiting gracefully upon receiving signal 
{}".format(signum))
+        self.terminate()
+        self.end()
+        self.log.debug("Finished terminating DAG processors.")
+        sys.exit(os.EX_OK)
+
+    def start(self):
+        """
+        Use multiple processes to parse and generate tasks for the
+        DAGs in parallel. By processing them in separate processes,
+        we can get parallelism and isolation from potentially harmful
+        user code.
+        :return:
+        """
+
+        self.log.info("Processing files using up to {} processes at a time "
+                      .format(self._parallelism))
+        self.log.info("Process each file at most once every {} seconds"
+                      .format(self._file_process_interval))
+        self.log.info("Checking for new files in {} every {} seconds"
+                      .format(self._dag_directory, self.dag_dir_list_interval))
+
+        if self._async_mode:
+            self.log.debug("Starting DagFileProcessorManager in async mode")
+            self.start_in_async()
+        else:
+            self.log.debug("Starting DagFileProcessorManager in sync mode")
+            self.start_in_sync()
+
+    def start_in_async(self):
+        """
+        Parse DAG files repeatedly in a standalone loop.
+        """
+        while True:
+            loop_start_time = time.time()
+
+            if self._signal_conn.poll():
+                agent_signal = self._signal_conn.recv()
+                if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+                    self.terminate()
+                    break
+                elif agent_signal == DagParsingSignal.END_MANAGER:
+                    self.end()
+                    sys.exit(os.EX_OK)
+
+            self._refresh_dag_dir()
+
+            simple_dags = self.heartbeat()
+            for simple_dag in simple_dags:
+                self._result_queue.put(simple_dag)
+
+            self._print_stat()
+
+            all_files_processed = all(self.get_last_finish_time(x) is not None
+                                      for x in self.file_paths)
+            max_runs_reached = self.max_runs_reached()
+
+            dag_parsing_stat = DagParsingStat(self._file_paths,
+                                              self.get_all_pids(),
+                                              max_runs_reached,
+                                              all_files_processed,
+                                              len(simple_dags))
+            self._stat_queue.put(dag_parsing_stat)
+
+            if max_runs_reached:
+                self.log.info("Exiting dag parsing loop as all files "
+                              "have been processed %s times", self._max_runs)
+                break
+
+            loop_duration = time.time() - loop_start_time
+            if loop_duration < 1:
+                sleep_length = 1 - loop_duration
+                self.log.debug("Sleeping for {0:.2f} seconds "
+                               "to prevent excessive 
logging".format(sleep_length))
+                time.sleep(sleep_length)
+
+    def start_in_sync(self):
+        """
+        Parse DAG files in a loop controlled by DagParsingSignal.
+        Actual DAG parsing loop will run once upon receiving one
+        agent heartbeat message and will report done when finished the loop.
+        """
+        while True:
+            agent_signal = self._signal_conn.recv()
+            if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+                self.terminate()
+                break
+            elif agent_signal == DagParsingSignal.END_MANAGER:
+                self.end()
+                sys.exit(os.EX_OK)
+            elif agent_signal == DagParsingSignal.AGENT_HEARTBEAT:
+
+                self._refresh_dag_dir()
+
+                simple_dags = self.heartbeat()
+                for simple_dag in simple_dags:
+                    self._result_queue.put(simple_dag)
+
+                self._print_stat()
+
+                all_files_processed = all(self.get_last_finish_time(x) is not 
None
+                                          for x in self.file_paths)
+                max_runs_reached = self.max_runs_reached()
+
+                dag_parsing_stat = DagParsingStat(self._file_paths,
+                                                  self.get_all_pids(),
+                                                  self.max_runs_reached(),
+                                                  all_files_processed,
+                                                  len(simple_dags))
+                self._stat_queue.put(dag_parsing_stat)
+
+                self.wait_until_finished()
+                self._signal_conn.send(DagParsingSignal.MANAGER_DONE)
+
+                if max_runs_reached:
+                    self.log.info("Exiting dag parsing loop as all files "
+                                  "have been processed %s times", 
self._max_runs)
+                    self._signal_conn.send(DagParsingSignal.MANAGER_DONE)
+                    break
+
+    def _refresh_dag_dir(self):
+        """
+        Refresh file paths from dag dir if we haven't done it for too long.
+        """
+        elapsed_time_since_refresh = (timezone.utcnow() -
+                                      
self.last_dag_dir_refresh_time).total_seconds()
+        if elapsed_time_since_refresh > self.dag_dir_list_interval:
+            # Build up a list of Python files that could contain DAGs
+            self.log.info(
+                "Searching for files in {}".format(self._dag_directory))
+            self._file_paths = list_py_file_paths(self._dag_directory)
+            self.last_dag_dir_refresh_time = timezone.utcnow()
+            self.log.info("There are {} files in {}"
+                          .format(len(self._file_paths),
+                                  self._dag_directory))
+            self.set_file_paths(self._file_paths)
+
+            try:
+                self.log.debug("Removing old import errors")
+                self.clear_nonexistent_import_errors()
+            except Exception:
+                self.log.exception("Error removing old import errors")
+
+    def _print_stat(self):
+        """
+        Occasionally print out stats about how fast the files are getting 
processed
+        :return:
+        """
+        if ((timezone.utcnow() - self.last_stat_print_time).total_seconds() >
+                self.print_stats_interval):
+            if len(self._file_paths) > 0:
+                self._log_file_processing_stats(self._file_paths)
+            self.last_stat_print_time = timezone.utcnow()
+
+    @provide_session
+    def clear_nonexistent_import_errors(self, session):
+        """
+        Clears import errors for files that no longer exist.
+        :param session: session for ORM operations
+        :type session: sqlalchemy.orm.session.Session
+        """
+        query = session.query(airflow.models.ImportError)
+        if self._file_paths:
+            query = query.filter(
+                ~airflow.models.ImportError.filename.in_(self._file_paths)
+            )
+        query.delete(synchronize_session='fetch')
+        session.commit()
+
+    def _log_file_processing_stats(self, known_file_paths):
+        """
+        Print out stats about how files are getting processed.
+        :param known_file_paths: a list of file paths that may contain Airflow
+            DAG definitions
+        :type known_file_paths: list[unicode]
+        :return: None
+        """
+
+        # File Path: Path to the file containing the DAG definition
+        # PID: PID associated with the process that's processing the file. May
+        # be empty.
+        # Runtime: If the process is currently running, how long it's been
+        # running for in seconds.
+        # Last Runtime: If the process ran before, how long did it take to
+        # finish in seconds
+        # Last Run: When the file finished processing in the previous run.
+        headers = ["File Path",
+                   "PID",
+                   "Runtime",
+                   "Last Runtime",
+                   "Last Run"]
+
+        rows = []
+        for file_path in known_file_paths:
+            last_runtime = self.get_last_runtime(file_path)
+            processor_pid = self.get_pid(file_path)
+            processor_start_time = self.get_start_time(file_path)
+            runtime = ((timezone.utcnow() - 
processor_start_time).total_seconds()
+                       if processor_start_time else None)
+            last_run = self.get_last_finish_time(file_path)
+
+            rows.append((file_path,
+                         processor_pid,
+                         runtime,
+                         last_runtime,
+                         last_run))
+
+        # Sort by longest last runtime. (Can't sort None values in python3)
+        rows = sorted(rows, key=lambda x: x[3] or 0.0)
+
+        formatted_rows = []
+        for file_path, pid, runtime, last_runtime, last_run in rows:
+            formatted_rows.append((file_path,
+                                   pid,
+                                   "{:.2f}s".format(runtime)
+                                   if runtime else None,
+                                   "{:.2f}s".format(last_runtime)
+                                   if last_runtime else None,
+                                   last_run.strftime("%Y-%m-%dT%H:%M:%S")
+                                   if last_run else None))
+        log_str = ("\n" +
+                   "=" * 80 +
+                   "\n" +
+                   "DAG File Processing Stats\n\n" +
+                   tabulate(formatted_rows, headers=headers) +
+                   "\n" +
+                   "=" * 80)
+
+        self.log.info(log_str)
+
     @property
     def file_paths(self):
         return self._file_paths
@@ -472,7 +1032,7 @@ def wait_until_finished(self):
 
     def heartbeat(self):
         """
-        This should be periodically called by the scheduler. This method will
+        This should be periodically called by the manager loop. This method 
will
         kick off new processes to process DAG definition files and read the
         results from the finished processors.
 
@@ -498,7 +1058,7 @@ def heartbeat(self):
                 running_processors[file_path] = processor
         self._processors = running_processors
 
-        self.log.debug("%s/%s scheduler processes running",
+        self.log.debug("%s/%s DAG parsing processes running",
                        len(self._processors), self._parallelism)
 
         self.log.debug("%s file paths queued for processing",
@@ -528,7 +1088,7 @@ def heartbeat(self):
                 last_finish_time = self.get_last_finish_time(file_path)
                 if (last_finish_time is not None and
                     (now - last_finish_time).total_seconds() <
-                        self._process_file_interval):
+                        self._file_process_interval):
                     file_paths_recently_processed.append(file_path)
 
             files_paths_at_run_limit = [file_path
@@ -553,11 +1113,13 @@ def heartbeat(self):
 
             self._file_path_queue.extend(files_paths_to_queue)
 
+        zombies = self._find_zombies()
+
         # Start more processors if we have enough slots and files to process
         while (self._parallelism - len(self._processors) > 0 and
                len(self._file_path_queue) > 0):
             file_path = self._file_path_queue.pop(0)
-            processor = self._processor_factory(file_path)
+            processor = self._processor_factory(file_path, zombies)
 
             processor.start()
             self.log.debug(
@@ -566,11 +1128,47 @@ def heartbeat(self):
             )
             self._processors[file_path] = processor
 
-        # Update scheduler heartbeat count.
+        # Update heartbeat count.
         self._run_count[self._heart_beat_key] += 1
 
         return simple_dags
 
+    @provide_session
+    def _find_zombies(self, session):
+        """
+        Find zombie task instances, which are tasks haven't heartbeated for 
too long.
+        :return: Zombie task instances in SimpleTaskInstance format.
+        """
+        now = timezone.utcnow()
+        zombies = []
+        if (now - self._last_zombie_query_time).total_seconds() \
+                > self._zombie_query_interval:
+            # to avoid circular imports
+            from airflow.jobs import LocalTaskJob as LJ
+            self.log.info("Finding 'running' jobs without a recent heartbeat")
+            TI = airflow.models.TaskInstance
+            limit_dttm = timezone.utcnow() - timedelta(
+                seconds=self._zombie_threshold_secs)
+            self.log.info(
+                "Failing jobs without heartbeat after {}".format(limit_dttm))
+
+            tis = (
+                session.query(TI)
+                .join(LJ, TI.job_id == LJ.id)
+                .filter(TI.state == State.RUNNING)
+                .filter(
+                    or_(
+                        LJ.state != State.RUNNING,
+                        LJ.latest_heartbeat < limit_dttm,
+                    )
+                ).all()
+            )
+            self._last_zombie_query_time = timezone.utcnow()
+            for ti in tis:
+                zombies.append(SimpleTaskInstance(ti))
+
+        return zombies
+
     def max_runs_reached(self):
         """
         :return: whether all file paths have been processed max_runs times
@@ -591,3 +1189,41 @@ def terminate(self):
         """
         for processor in self._processors.values():
             processor.terminate()
+
+    def end(self):
+        """
+        Kill all child processes on exit since we don't want to leave
+        them as orphaned.
+        """
+        pids_to_kill = self.get_all_pids()
+        if len(pids_to_kill) > 0:
+            # First try SIGTERM
+            this_process = psutil.Process(os.getpid())
+            # Only check child processes to ensure that we don't have a case
+            # where we kill the wrong process because a child process died
+            # but the PID got reused.
+            child_processes = [x for x in this_process.children(recursive=True)
+                               if x.is_running() and x.pid in pids_to_kill]
+            for child in child_processes:
+                self.log.info("Terminating child PID: {}".format(child.pid))
+                child.terminate()
+            # TODO: Remove magic number
+            timeout = 5
+            self.log.info(
+                "Waiting up to %s seconds for processes to exit...", timeout)
+            try:
+                psutil.wait_procs(
+                    child_processes, timeout=timeout,
+                    callback=lambda x: self.log.info('Terminated PID %s', 
x.pid))
+            except psutil.TimeoutExpired:
+                self.log.debug("Ran out of time while waiting for processes to 
exit")
+
+            # Then SIGKILL
+            child_processes = [x for x in this_process.children(recursive=True)
+                               if x.is_running() and x.pid in pids_to_kill]
+            if len(child_processes) > 0:
+                self.log.info("SIGKILL processes that did not terminate 
gracefully")
+                for child in child_processes:
+                    self.log.info("Killing child PID: {}".format(child.pid))
+                    child.kill()
+                    child.wait()
diff --git a/docs/img/scheduler_loop.jpg b/docs/img/scheduler_loop.jpg
new file mode 100644
index 0000000000..a4a5e285e5
Binary files /dev/null and b/docs/img/scheduler_loop.jpg differ
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index 9c5cb5c90c..dc43c111fe 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -3,8 +3,9 @@ Scheduling & Triggers
 
 The Airflow scheduler monitors all tasks and all DAGs, and triggers the
 task instances whose dependencies have been met. Behind the scenes,
-it monitors and stays in sync with a folder for all DAG objects it may contain,
-and periodically (every minute or so) inspects active tasks to see whether
+it spins up a subprocess, which monitors and stays in sync with a folder
+for all DAG objects it may contain, and periodically (every minute or so)
+collects DAG parsing results and inspects active tasks to see whether
 they can be triggered.
 
 The Airflow scheduler is designed to run as a persistent service in an
diff --git a/tests/jobs.py b/tests/jobs.py
index 9b265724b6..7d7a67e1d2 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -28,37 +28,36 @@
 import multiprocessing
 import os
 import shutil
-import six
 import threading
 import time
 import unittest
 from tempfile import mkdtemp
 
+import psutil
+import six
 import sqlalchemy
+from mock import Mock, patch, MagicMock, PropertyMock
 
+from airflow.utils.db import create_session
 from airflow import AirflowException, settings, models
+from airflow import configuration
 from airflow.bin import cli
 from airflow.executors import BaseExecutor, SequentialExecutor
 from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance 
as TI
-from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils import timezone
-
+from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, 
list_py_file_paths
 from airflow.utils.dates import days_ago
 from airflow.utils.db import provide_session
+from airflow.utils.net import get_hostname
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, 
list_py_file_paths
-from airflow.utils.net import get_hostname
-
-from mock import Mock, patch, MagicMock, PropertyMock
-from tests.executors.test_executor import TestExecutor
-
 from tests.core import TEST_DAG_FOLDER
+from tests.executors.test_executor import TestExecutor
 
-from airflow import configuration
 configuration.load_test_config()
 
 logger = logging.getLogger(__name__)
@@ -1260,10 +1259,10 @@ class SchedulerJobTest(unittest.TestCase):
 
     def setUp(self):
         self.dagbag = DagBag()
-        session = settings.Session()
-        session.query(models.DagRun).delete()
-        session.query(models.ImportError).delete()
-        session.commit()
+        with create_session() as session:
+            session.query(models.DagRun).delete()
+            session.query(models.ImportError).delete()
+            session.commit()
 
     @staticmethod
     def run_single_scheduler_loop_with_no_dags(dags_folder):
@@ -1286,6 +1285,21 @@ def run_single_scheduler_loop_with_no_dags(dags_folder):
     def _make_simple_dag_bag(self, dags):
         return SimpleDagBag([SimpleDag(dag) for dag in dags])
 
+    def test_no_orphan_process_will_be_left(self):
+        empty_dir = mkdtemp()
+        current_process = psutil.Process()
+        old_children = current_process.children(recursive=True)
+        scheduler = SchedulerJob(subdir=empty_dir,
+                                 num_runs=1)
+        scheduler.executor = TestExecutor()
+        scheduler.run()
+        shutil.rmtree(empty_dir)
+
+        # Remove potential noise created by previous tests.
+        current_children = set(current_process.children(recursive=True)) - set(
+            old_children)
+        self.assertFalse(current_children)
+
     def test_process_executor_events(self):
         dag_id = "test_process_executor_events"
         dag_id2 = "test_process_executor_events_2"
@@ -2046,13 +2060,13 @@ def test_execute_helper_reset_orphaned_tasks(self):
         session.commit()
 
         processor = mock.MagicMock()
-        processor.get_last_finish_time.return_value = None
 
         scheduler = SchedulerJob(num_runs=0, run_duration=0)
         executor = TestExecutor()
         scheduler.executor = executor
+        scheduler.processor_agent = processor
 
-        scheduler._execute_helper(processor_manager=processor)
+        scheduler._execute_helper()
 
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         self.assertEqual(ti.state, State.NONE)
@@ -2159,10 +2173,10 @@ def test_dagrun_root_fail_unfinished(self):
 
         # Mark the successful task as never having run since we want to see if 
the
         # dagrun will be in a running state despite haveing an unfinished task.
-        session = settings.Session()
-        ti = dr.get_task_instance('test_dagrun_unfinished', session=session)
-        ti.state = State.NONE
-        session.commit()
+        with create_session() as session:
+            ti = dr.get_task_instance('test_dagrun_unfinished', 
session=session)
+            ti.state = State.NONE
+            session.commit()
         dr_state = dr.update_state()
         self.assertEqual(dr_state, State.RUNNING)
 
@@ -2204,44 +2218,44 @@ def test_scheduler_start_date(self):
         """
         Test that the scheduler respects start_dates, even when DAGS have run
         """
+        with create_session() as session:
+            dag_id = 'test_start_date_scheduling'
+            dag = self.dagbag.get_dag(dag_id)
+            dag.clear()
+            self.assertTrue(dag.start_date > datetime.datetime.utcnow())
 
-        dag_id = 'test_start_date_scheduling'
-        dag = self.dagbag.get_dag(dag_id)
-        dag.clear()
-        self.assertTrue(dag.start_date > datetime.datetime.utcnow())
-
-        scheduler = SchedulerJob(dag_id,
-                                 num_runs=2)
-        scheduler.run()
+            scheduler = SchedulerJob(dag_id,
+                                     num_runs=2)
+            scheduler.run()
 
-        # zero tasks ran
-        session = settings.Session()
-        self.assertEqual(
-            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
+            # zero tasks ran
+            self.assertEqual(
+                len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
+            session.commit()
 
-        # previously, running this backfill would kick off the Scheduler
-        # because it would take the most recent run and start from there
-        # That behavior still exists, but now it will only do so if after the
-        # start date
-        backfill = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE)
-        backfill.run()
+            # previously, running this backfill would kick off the Scheduler
+            # because it would take the most recent run and start from there
+            # That behavior still exists, but now it will only do so if after 
the
+            # start date
+            backfill = BackfillJob(
+                dag=dag,
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE)
+            backfill.run()
 
-        # one task ran
-        session = settings.Session()
-        self.assertEqual(
-            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+            # one task ran
+            self.assertEqual(
+                len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+            session.commit()
 
-        scheduler = SchedulerJob(dag_id,
-                                 num_runs=2)
-        scheduler.run()
+            scheduler = SchedulerJob(dag_id,
+                                     num_runs=2)
+            scheduler.run()
 
-        # still one task
-        session = settings.Session()
-        self.assertEqual(
-            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+            # still one task
+            self.assertEqual(
+                len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+            session.commit()
 
     def test_scheduler_task_start_date(self):
         """
@@ -3004,7 +3018,9 @@ def test_scheduler_run_duration(self):
         logging.info("Test ran in %.2fs, expected %.2fs",
                      run_duration,
                      expected_run_duration)
-        self.assertLess(run_duration - expected_run_duration, 5.0)
+        # 5s to wait for child process to exit and 1s dummy sleep
+        # in scheduler loop to prevent excessive logs.
+        self.assertLess(run_duration - expected_run_duration, 6.0)
 
     def test_dag_with_system_exit(self):
         """
@@ -3028,9 +3044,9 @@ def test_dag_with_system_exit(self):
                                  subdir=dag_directory,
                                  num_runs=1)
         scheduler.run()
-        session = settings.Session()
-        self.assertEqual(
-            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+        with create_session() as session:
+            self.assertEqual(
+                len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
 
     def test_dag_get_active_runs(self):
         """
@@ -3166,8 +3182,8 @@ def setup_dag(dag_id, schedule_interval, start_date, 
catchup):
         self.assertIsNotNone(dr)
 
     def 
test_add_unparseable_file_before_sched_start_creates_import_error(self):
+        dags_folder = mkdtemp()
         try:
-            dags_folder = mkdtemp()
             unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
             with open(unparseable_filename, 'w') as unparseable_file:
                 unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
@@ -3175,8 +3191,8 @@ def 
test_add_unparseable_file_before_sched_start_creates_import_error(self):
         finally:
             shutil.rmtree(dags_folder)
 
-        session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        with create_session() as session:
+            import_errors = session.query(models.ImportError).all()
 
         self.assertEqual(len(import_errors), 1)
         import_error = import_errors[0]
@@ -3186,8 +3202,8 @@ def 
test_add_unparseable_file_before_sched_start_creates_import_error(self):
                          "invalid syntax ({}, line 
1)".format(TEMP_DAG_FILENAME))
 
     def test_add_unparseable_file_after_sched_start_creates_import_error(self):
+        dags_folder = mkdtemp()
         try:
-            dags_folder = mkdtemp()
             unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
             self.run_single_scheduler_loop_with_no_dags(dags_folder)
 
@@ -3197,8 +3213,8 @@ def 
test_add_unparseable_file_after_sched_start_creates_import_error(self):
         finally:
             shutil.rmtree(dags_folder)
 
-        session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        with create_session() as session:
+            import_errors = session.query(models.ImportError).all()
 
         self.assertEqual(len(import_errors), 1)
         import_error = import_errors[0]
@@ -3218,8 +3234,8 @@ def test_no_import_errors_with_parseable_dag(self):
         finally:
             shutil.rmtree(dags_folder)
 
-        session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        with create_session() as session:
+            import_errors = session.query(models.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
 
@@ -3291,8 +3307,8 @@ def test_remove_file_clears_import_error(self):
         # Rerun the scheduler once the dag file has been removed
         self.run_single_scheduler_loop_with_no_dags(dags_folder)
 
-        session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        with create_session() as session:
+            import_errors = session.query(models.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
 
diff --git a/tests/models.py b/tests/models.py
index f2d36a263b..cdef617178 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -23,42 +23,44 @@
 from __future__ import unicode_literals
 
 import datetime
+import inspect
 import logging
 import os
-import pendulum
-import unittest
-import time
-import six
 import re
-import urllib
 import textwrap
-import inspect
+import time
+import unittest
+import urllib
+from tempfile import NamedTemporaryFile, mkdtemp
+
+import pendulum
+import six
+from mock import ANY, Mock, patch
+from parameterized import parameterized
 
-from airflow import configuration, models, settings, AirflowException
+from airflow import AirflowException, configuration, models, settings
 from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
 from airflow.jobs import BackfillJob
+from airflow.models import Connection
 from airflow.models import DAG, TaskInstance as TI
-from airflow.models import State as ST
 from airflow.models import DagModel, DagRun, DagStat
-from airflow.models import clear_task_instances
-from airflow.models import XCom
-from airflow.models import Connection
-from airflow.models import SkipMixin
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier
-from airflow.jobs import LocalTaskJob
-from airflow.operators.dummy_operator import DummyOperator
+from airflow.models import SkipMixin
+from airflow.models import State as ST
+from airflow.models import XCom
+from airflow.models import clear_task_instances
 from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils import timezone
-from airflow.utils.weight_rule import WeightRule
+from airflow.utils.dag_processing import SimpleTaskInstance
+from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
-from mock import patch, Mock, ANY
-from parameterized import parameterized
-from tempfile import mkdtemp, NamedTemporaryFile
+from airflow.utils.weight_rule import WeightRule
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -1665,31 +1667,28 @@ def test_process_file_with_none(self):
         self.assertEqual([], dagbag.process_file(None))
 
     @patch.object(TI, 'handle_failure')
-    def test_kill_zombies(self, mock_ti):
+    def test_kill_zombies(self, mock_ti_handle_failure):
         """
         Test that kill zombies call TIs failure handler with proper context
         """
         dagbag = models.DagBag()
-        session = settings.Session
-        dag = dagbag.get_dag('example_branch_operator')
-        task = dag.get_task(task_id='run_this_first')
-
-        ti = TI(task, datetime.datetime.now() - datetime.timedelta(1), 
'running')
-        lj = LocalTaskJob(ti)
-        lj.state = State.SHUTDOWN
-
-        session.add(lj)
-        session.commit()
-
-        ti.job_id = lj.id
-
-        session.add(ti)
-        session.commit()
-
-        dagbag.kill_zombies()
-        mock_ti.assert_called_with(ANY,
-                                   configuration.getboolean('core', 
'unit_test_mode'),
-                                   ANY)
+        with create_session() as session:
+            session.query(TI).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TI(task, DEFAULT_DATE, State.RUNNING)
+
+            session.add(ti)
+            session.commit()
+
+            zombies = [SimpleTaskInstance(ti)]
+            dagbag.kill_zombies(zombies)
+            mock_ti_handle_failure \
+                .assert_called_with(ANY,
+                                    configuration.getboolean('core',
+                                                             'unit_test_mode'),
+                                    ANY)
 
 
 class TaskInstanceTest(unittest.TestCase):
diff --git a/tests/utils/test_dag_processing.py 
b/tests/utils/test_dag_processing.py
index 687d8b13c2..4043caec56 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -17,11 +17,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
 import unittest
+from datetime import timedelta
 
 from mock import MagicMock
 
-from airflow.utils.dag_processing import DagFileProcessorManager
+from airflow import configuration as conf
+from airflow.jobs import DagFileProcessor
+from airflow.jobs import LocalTaskJob as LJ
+from airflow.models import DagBag, TaskInstance as TI
+from airflow.utils import timezone
+from airflow.utils.dag_processing import (DagFileProcessorAgent, 
DagFileProcessorManager,
+                                          SimpleTaskInstance)
+from airflow.utils.db import create_session
+from airflow.utils.state import State
+
+TEST_DAG_FOLDER = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags')
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
 class TestDagFileProcessorManager(unittest.TestCase):
@@ -29,10 +44,12 @@ def 
test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
         manager = DagFileProcessorManager(
             dag_directory='directory',
             file_paths=['abc.txt'],
-            parallelism=1,
-            process_file_interval=1,
             max_runs=1,
-            processor_factory=MagicMock().return_value)
+            processor_factory=MagicMock().return_value,
+            signal_conn=MagicMock(),
+            stat_queue=MagicMock(),
+            result_queue=MagicMock,
+            async_mode=True)
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError(
@@ -48,10 +65,12 @@ def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
         manager = DagFileProcessorManager(
             dag_directory='directory',
             file_paths=['abc.txt'],
-            parallelism=1,
-            process_file_interval=1,
             max_runs=1,
-            processor_factory=MagicMock().return_value)
+            processor_factory=MagicMock().return_value,
+            signal_conn=MagicMock(),
+            stat_queue=MagicMock(),
+            result_queue=MagicMock,
+            async_mode=True)
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError(
@@ -62,3 +81,107 @@ def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
 
         manager.set_file_paths(['abc.txt'])
         self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})
+
+    def test_find_zombies(self):
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            signal_conn=MagicMock(),
+            stat_queue=MagicMock(),
+            result_queue=MagicMock,
+            async_mode=True)
+
+        dagbag = DagBag(TEST_DAG_FOLDER)
+        with create_session() as session:
+            session.query(LJ).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TI(task, DEFAULT_DATE, State.RUNNING)
+            lj = LJ(ti)
+            lj.state = State.SHUTDOWN
+            lj.id = 1
+            ti.job_id = lj.id
+
+            session.add(lj)
+            session.add(ti)
+            session.commit()
+
+            manager._last_zombie_query_time = timezone.utcnow() - timedelta(
+                seconds=manager._zombie_threshold_secs + 1)
+            zombies = manager._find_zombies()
+            self.assertEquals(1, len(zombies))
+            self.assertIsInstance(zombies[0], SimpleTaskInstance)
+            self.assertEquals(ti.dag_id, zombies[0].dag_id)
+            self.assertEquals(ti.task_id, zombies[0].task_id)
+            self.assertEquals(ti.execution_date, zombies[0].execution_date)
+
+            session.query(TI).delete()
+            session.query(LJ).delete()
+
+
+class TestDagFileProcessorAgent(unittest.TestCase):
+    def test_parse_once(self):
+        def processor_factory(file_path, zombies):
+            return DagFileProcessor(file_path,
+                                    False,
+                                    [],
+                                    zombies)
+
+        test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        processor_agent = DagFileProcessorAgent(test_dag_path,
+                                                [test_dag_path],
+                                                1,
+                                                processor_factory,
+                                                async_mode)
+        processor_agent.start()
+        parsing_result = []
+        while not processor_agent.done:
+            if not async_mode:
+                processor_agent.heartbeat()
+                processor_agent.wait_until_finished()
+            parsing_result.extend(processor_agent.harvest_simple_dags())
+
+        dag_ids = [result.dag_id for result in parsing_result]
+        self.assertEqual(dag_ids.count('test_start_date_scheduling'), 1)
+
+    def test_launch_process(self):
+        def processor_factory(file_path, zombies):
+            return DagFileProcessor(file_path,
+                                    False,
+                                    [],
+                                    zombies)
+
+        test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+
+        log_file_loc = conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
+        try:
+            os.remove(log_file_loc)
+        except OSError:
+            pass
+
+        # Starting dag processing with 0 max_runs to avoid redundent 
operations.
+        processor_agent = DagFileProcessorAgent(test_dag_path,
+                                                [],
+                                                0,
+                                                processor_factory,
+                                                async_mode)
+        manager_process = \
+            processor_agent._launch_process(processor_agent._dag_directory,
+                                            processor_agent._file_paths,
+                                            processor_agent._max_runs,
+                                            processor_agent._processor_factory,
+                                            processor_agent._child_signal_conn,
+                                            processor_agent._stat_queue,
+                                            processor_agent._result_queue,
+                                            processor_agent._async_mode)
+        if not async_mode:
+            processor_agent.heartbeat()
+
+        manager_process.join()
+
+        self.assertTrue(os.path.isfile(log_file_loc))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DAG parsing loop coupled with scheduler loop
> --------------------------------------------
>
>                 Key: AIRFLOW-2760
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2760
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Kevin Yang
>            Assignee: Kevin Yang
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Currently DAG parsing loop is coupled with scheduler loop, meaning that if 
> scheduler loop became slow, we will parse DAG slower.
> As a simple producer and consumer pattern, we shall have them decoupled and 
> completely remove the scheduling bottleneck placed by DAG parsing--which is 
> identified in Airbnb as the current biggest bottleneck.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to