This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 873f47617c37243ff3d33e7b437ba2430e0417a5 Author: Jarek Potiuk <[email protected]> AuthorDate: Mon Jun 29 16:40:18 2020 +0200 [AIRFLOW-6856] Bulk fetch paused_dag_ids (cherry picked from commit d031f844517a8d12e7d90af0c472ca00c64b8963) --- airflow/dag/base_dag.py | 8 -------- airflow/jobs/scheduler_job.py | 7 +------ airflow/utils/dag_processing.py | 9 --------- 3 files changed, 1 insertion(+), 23 deletions(-) diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py index 0e65775..6e556a3 100644 --- a/airflow/dag/base_dag.py +++ b/airflow/dag/base_dag.py @@ -64,14 +64,6 @@ class BaseDag(object): raise NotImplementedError() @abstractmethod - def is_paused(self): - """ - :return: whether this DAG is paused or not - :rtype: bool - """ - raise NotImplementedError() - - @abstractmethod def pickle_id(self): """ :return: The pickle ID for this DAG, if it has one. Otherwise None. diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 9da73b7..0665779 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1277,10 +1277,6 @@ class SchedulerJob(BaseJob): self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id) continue - if dag.is_paused: - self.log.info("Not processing DAG %s since it's paused", dag.dag_id) - continue - self.log.info("Processing %s", dag.dag_id) dag_run = self.create_dag_run(dag) @@ -1581,8 +1577,7 @@ class SchedulerJob(BaseJob): for dag in dagbag.dags.values(): dag.sync_to_db() - paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() - if dag.is_paused] + paused_dag_ids = models.DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids) # Pickle the DAGs (if necessary) and put them into a SimpleDag for dag_id in dagbag.dags: diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index c888726..6e4e045 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -76,7 +76,6 @@ class SimpleDag(BaseDag): self._dag_id = dag.dag_id self._task_ids = [task.task_id for task in dag.tasks] self._full_filepath = dag.full_filepath - self._is_paused = dag.is_paused self._concurrency = dag.concurrency self._pickle_id = pickle_id self._task_special_args = {} @@ -120,14 +119,6 @@ class SimpleDag(BaseDag): return self._concurrency @property - def is_paused(self): - """ - :return: whether this DAG is paused or not - :rtype: bool - """ - return self._is_paused - - @property def pickle_id(self): """ :return: The pickle ID for this DAG, if it has one. Otherwise None.
