This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 112f7d7  Add creating_job_id to DagRun table (#11396)
112f7d7 is described below

commit 112f7d716900556a7a41e3a8eea197f6bcfe9ed9
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Sat Oct 17 12:31:07 2020 +0200

    Add creating_job_id to DagRun table (#11396)
    
    This PR introduces creating_job_id column in DagRun table that links a
    DagRun to job that created it. Part of #11302
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow/jobs/backfill_job.py                       |  1 +
 airflow/jobs/base_job.py                           |  9 +++++
 airflow/jobs/scheduler_job.py                      |  3 +-
 .../364159666cbd_add_job_id_to_dagrun_table.py     | 44 ++++++++++++++++++++++
 airflow/models/dag.py                              |  8 +++-
 airflow/models/dagrun.py                           |  3 ++
 tests/jobs/test_backfill_job.py                    | 14 +++++++
 tests/jobs/test_scheduler_job.py                   | 27 +++++++++++++
 tests/models/test_dag.py                           |  8 ++++
 9 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 4949f58..de7c53b 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -325,6 +325,7 @@ class BackfillJob(BaseJob):
             session=session,
             conf=self.conf,
             run_type=DagRunType.BACKFILL_JOB,
+            creating_job_id=self.id,
         )
 
         # set required transient field
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 2e37b0d..a6b418a 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -29,6 +29,7 @@ from sqlalchemy.orm.session import make_transient
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.executors.executor_loader import ExecutorLoader
+from airflow.models import DagRun
 from airflow.models.base import ID_LEN, Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.stats import Stats
@@ -78,6 +79,14 @@ class BaseJob(Base, LoggingMixin):
         foreign_keys=id,
         backref=backref('queued_by_job', uselist=False),
     )
+
+    dag_runs = relationship(
+        DagRun,
+        primaryjoin=id == DagRun.creating_job_id,
+        foreign_keys=id,
+        backref=backref('creating_job'),
+    )
+
     """
     TaskInstances which have been enqueued by this Job.
 
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2425962..ba472c4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1563,7 +1563,8 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
                 state=State.RUNNING,
                 external_trigger=False,
                 session=session,
-                dag_hash=dag_hash
+                dag_hash=dag_hash,
+                creating_job_id=self.id,
             )
 
         self._update_dag_next_dagruns(dag_models, session)
diff --git 
a/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py 
b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py
new file mode 100644
index 0000000..b4e1fae
--- /dev/null
+++ b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add creating_job_id to DagRun table
+
+Revision ID: 364159666cbd
+Revises: 849da589634d
+Create Date: 2020-10-10 09:08:07.332456
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '364159666cbd'
+down_revision = '849da589634d'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add creating_job_id to DagRun table"""
+    op.add_column('dag_run', sa.Column('creating_job_id', sa.Integer))
+
+
+def downgrade():
+    """Unapply Add job_id to DagRun table"""
+    op.drop_column('dag_run', 'creating_job_id')
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 45269f3..1d928d2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1655,7 +1655,8 @@ class DAG(BaseDag, LoggingMixin):
         conf=None,
         run_type=None,
         session=None,
-        dag_hash=None
+        dag_hash=None,
+        creating_job_id=None,
     ):
         """
         Creates a dag run from this dag including the tasks associated with 
this dag.
@@ -1675,6 +1676,8 @@ class DAG(BaseDag, LoggingMixin):
         :type external_trigger: bool
         :param conf: Dict containing configuration/parameters to pass to the 
DAG
         :type conf: dict
+        :param creating_job_id: id of the job creating this DagRun
+        :type creating_job_id: int
         :param session: database session
         :type session: sqlalchemy.orm.session.Session
         :param dag_hash: Hash of Serialized DAG
@@ -1702,7 +1705,8 @@ class DAG(BaseDag, LoggingMixin):
             conf=conf,
             state=state,
             run_type=run_type.value,
-            dag_hash=dag_hash
+            dag_hash=dag_hash,
+            creating_job_id=creating_job_id
         )
         session.add(run)
         session.flush()
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 39e2348..07d83c5 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -58,6 +58,7 @@ class DagRun(Base, LoggingMixin):
     end_date = Column(UtcDateTime)
     _state = Column('state', String(50), default=State.RUNNING)
     run_id = Column(String(ID_LEN))
+    creating_job_id = Column(Integer)
     external_trigger = Column(Boolean, default=True)
     run_type = Column(String(50), nullable=False)
     conf = Column(PickleType)
@@ -98,6 +99,7 @@ class DagRun(Base, LoggingMixin):
         state: Optional[str] = None,
         run_type: Optional[str] = None,
         dag_hash: Optional[str] = None,
+        creating_job_id: Optional[int] = None,
     ):
         self.dag_id = dag_id
         self.run_id = run_id
@@ -108,6 +110,7 @@ class DagRun(Base, LoggingMixin):
         self.state = state
         self.run_type = run_type
         self.dag_hash = dag_hash
+        self.creating_job_id = creating_job_id
         super().__init__()
 
     def __repr__(self):
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index ceb571c..3041ad0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1536,3 +1536,17 @@ class TestBackfillJob(unittest.TestCase):
         ti2.refresh_from_db(session=session)
         self.assertEqual(State.SCHEDULED, ti1.state)
         self.assertEqual(State.NONE, ti2.state)
+
+    def test_job_id_is_assigned_to_dag_run(self):
+        dag_id = 'test_job_id_is_assigned_to_dag_run'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, 
schedule_interval='@daily')
+        DummyOperator(task_id="dummy_task", dag=dag)
+
+        job = BackfillJob(
+            dag=dag,
+            executor=MockExecutor(),
+            start_date=datetime.datetime.now() - datetime.timedelta(days=1)
+        )
+        job.run()
+        dr: DagRun = dag.get_last_dagrun()
+        assert dr.creating_job_id == job.id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 14e4a4f..2e2b56b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3488,6 +3488,33 @@ class TestSchedulerJob(unittest.TestCase):
                 full_filepath=dag.fileloc, dag_id=dag_id
             )
 
+    def test_scheduler_sets_job_id_on_dag_run(self):
+        dag = DAG(
+            dag_id='test_scheduler_sets_job_id_on_dag_run',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+        )
+
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True
+        )
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db()
+        dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        with create_session() as session:
+            scheduler._create_dag_runs([dag_model], session)
+
+        assert dag.get_last_dagrun().creating_job_id == scheduler.id
+
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 3280922..afe42e3 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1348,6 +1348,14 @@ class TestDag(unittest.TestCase):
         dr = dag.create_dagrun(run_id="custom_is_set_to_manual", 
state=State.NONE)
         assert dr.run_type == DagRunType.MANUAL.value
 
+    def test_create_dagrun_job_id_is_set(self):
+        job_id = 42
+        dag = DAG(dag_id="test_create_dagrun_job_id_is_set")
+        dr = dag.create_dagrun(
+            run_id="test_create_dagrun_job_id_is_set", state=State.NONE, 
creating_job_id=job_id
+        )
+        assert dr.creating_job_id == job_id
+
     @parameterized.expand(
         [
             (State.NONE,),

Reply via email to