Repository: incubator-airflow Updated Branches: refs/heads/master 956699fe6 -> 028b3b88f
[AIRFLOW-1606][Airflow-1606][AIRFLOW-1605][AIRFLOW-160] DAG.sync_to_db is now a normal method Previously it was a static method that took as it's first argument a DAG, which really meant it wasn't truly a static method. To avoid reversing the parameter order I have given sensible defaults from the one and only use in the rest of the code base. Also remove documented "sync_to_db" parameter on DagBag that no longer exists -- this doc string refers to a parameter that was removed in [AIRFLOW-160]. Closes #2605 from ashb/AIRFLOW-1606-db-sync_to_db- not-static Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/028b3b88 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/028b3b88 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/028b3b88 Branch: refs/heads/master Commit: 028b3b88ff4f191c78bf1d9c41bf43a792f640ff Parents: 956699f Author: Ash Berlin-Taylor <[email protected]> Authored: Wed Sep 13 13:26:39 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Sep 13 13:26:39 2017 +0200 ---------------------------------------------------------------------- airflow/models.py | 26 +++++++++++++------------- airflow/utils/db.py | 4 +--- 2 files changed, 14 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/028b3b88/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 7dc51d1..20d750e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -173,10 +173,6 @@ class DagBag(BaseDagBag, LoggingMixin): :param include_examples: whether to include the examples that ship with airflow or not :type include_examples: bool - :param sync_to_db: whether to sync the properties of the DAGs to - the metadata DB while finding them, typically should be done - by the scheduler job only - :type sync_to_db: bool """ def __init__( @@ -3736,9 +3732,8 @@ class DAG(BaseDag, LoggingMixin): return run - @staticmethod @provide_session - def sync_to_db(dag, owner, sync_time, session=None): + def sync_to_db(self, owner=None, sync_time=None, session=None): """ Save attributes about this DAG to the DB. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a @@ -3751,21 +3746,26 @@ class DAG(BaseDag, LoggingMixin): :return: None """ + if owner is None: + owner = self.owner + if sync_time is None: + sync_time = datetime.utcnow() + orm_dag = session.query( - DagModel).filter(DagModel.dag_id == dag.dag_id).first() + DagModel).filter(DagModel.dag_id == self.dag_id).first() if not orm_dag: - orm_dag = DagModel(dag_id=dag.dag_id) - dag.logger.info("Creating ORM DAG for %s", dag.dag_id) - orm_dag.fileloc = dag.fileloc - orm_dag.is_subdag = dag.is_subdag + orm_dag = DagModel(dag_id=self.dag_id) + self.logger.info("Creating ORM DAG for %s", self.dag_id) + orm_dag.fileloc = self.fileloc + orm_dag.is_subdag = self.is_subdag orm_dag.owners = owner orm_dag.is_active = True orm_dag.last_scheduler_run = sync_time session.merge(orm_dag) session.commit() - for subdag in dag.subdags: - DAG.sync_to_db(subdag, owner, sync_time, session=session) + for subdag in self.subdags: + subdag.sync_to_db(owner=owner, sync_time=sync_time, session=session) @staticmethod @provide_session http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/028b3b88/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b3c8a4d..c7e58e7 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -17,7 +17,6 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -from datetime import datetime from functools import wraps import os @@ -281,9 +280,8 @@ def initdb(): dagbag = models.DagBag() # Save individual DAGs in the ORM - now = datetime.utcnow() for dag in dagbag.dags.values(): - models.DAG.sync_to_db(dag, dag.owner, now) + dag.sync_to_db() # Deactivate the unknown ones models.DAG.deactivate_unknown_dags(dagbag.dags.keys())
