This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 873f47617c37243ff3d33e7b437ba2430e0417a5
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Jun 29 16:40:18 2020 +0200

    [AIRFLOW-6856] Bulk fetch paused_dag_ids
    
      (cherry picked from commit d031f844517a8d12e7d90af0c472ca00c64b8963)
---
 airflow/dag/base_dag.py         | 8 --------
 airflow/jobs/scheduler_job.py   | 7 +------
 airflow/utils/dag_processing.py | 9 ---------
 3 files changed, 1 insertion(+), 23 deletions(-)

diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py
index 0e65775..6e556a3 100644
--- a/airflow/dag/base_dag.py
+++ b/airflow/dag/base_dag.py
@@ -64,14 +64,6 @@ class BaseDag(object):
         raise NotImplementedError()
 
     @abstractmethod
-    def is_paused(self):
-        """
-        :return: whether this DAG is paused or not
-        :rtype: bool
-        """
-        raise NotImplementedError()
-
-    @abstractmethod
     def pickle_id(self):
         """
         :return: The pickle ID for this DAG, if it has one. Otherwise None.
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9da73b7..0665779 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1277,10 +1277,6 @@ class SchedulerJob(BaseJob):
                 self.log.error("DAG ID %s was not found in the DagBag", 
dag.dag_id)
                 continue
 
-            if dag.is_paused:
-                self.log.info("Not processing DAG %s since it's paused", 
dag.dag_id)
-                continue
-
             self.log.info("Processing %s", dag.dag_id)
 
             dag_run = self.create_dag_run(dag)
@@ -1581,8 +1577,7 @@ class SchedulerJob(BaseJob):
         for dag in dagbag.dags.values():
             dag.sync_to_db()
 
-        paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
-                          if dag.is_paused]
+        paused_dag_ids = 
models.DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
         # Pickle the DAGs (if necessary) and put them into a SimpleDag
         for dag_id in dagbag.dags:
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index c888726..6e4e045 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -76,7 +76,6 @@ class SimpleDag(BaseDag):
         self._dag_id = dag.dag_id
         self._task_ids = [task.task_id for task in dag.tasks]
         self._full_filepath = dag.full_filepath
-        self._is_paused = dag.is_paused
         self._concurrency = dag.concurrency
         self._pickle_id = pickle_id
         self._task_special_args = {}
@@ -120,14 +119,6 @@ class SimpleDag(BaseDag):
         return self._concurrency
 
     @property
-    def is_paused(self):
-        """
-        :return: whether this DAG is paused or not
-        :rtype: bool
-        """
-        return self._is_paused
-
-    @property
     def pickle_id(self):
         """
         :return: The pickle ID for this DAG, if it has one. Otherwise None.

Reply via email to