This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 112f7d7 Add creating_job_id to DagRun table (#11396)
112f7d7 is described below
commit 112f7d716900556a7a41e3a8eea197f6bcfe9ed9
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Sat Oct 17 12:31:07 2020 +0200
Add creating_job_id to DagRun table (#11396)
This PR introduces creating_job_id column in DagRun table that links a
DagRun to job that created it. Part of #11302
Co-authored-by: Kaxil Naik <[email protected]>
---
airflow/jobs/backfill_job.py | 1 +
airflow/jobs/base_job.py | 9 +++++
airflow/jobs/scheduler_job.py | 3 +-
.../364159666cbd_add_job_id_to_dagrun_table.py | 44 ++++++++++++++++++++++
airflow/models/dag.py | 8 +++-
airflow/models/dagrun.py | 3 ++
tests/jobs/test_backfill_job.py | 14 +++++++
tests/jobs/test_scheduler_job.py | 27 +++++++++++++
tests/models/test_dag.py | 8 ++++
9 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 4949f58..de7c53b 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -325,6 +325,7 @@ class BackfillJob(BaseJob):
session=session,
conf=self.conf,
run_type=DagRunType.BACKFILL_JOB,
+ creating_job_id=self.id,
)
# set required transient field
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 2e37b0d..a6b418a 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -29,6 +29,7 @@ from sqlalchemy.orm.session import make_transient
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
+from airflow.models import DagRun
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance
from airflow.stats import Stats
@@ -78,6 +79,14 @@ class BaseJob(Base, LoggingMixin):
foreign_keys=id,
backref=backref('queued_by_job', uselist=False),
)
+
+ dag_runs = relationship(
+ DagRun,
+ primaryjoin=id == DagRun.creating_job_id,
+ foreign_keys=id,
+ backref=backref('creating_job'),
+ )
+
"""
TaskInstances which have been enqueued by this Job.
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2425962..ba472c4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1563,7 +1563,8 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
state=State.RUNNING,
external_trigger=False,
session=session,
- dag_hash=dag_hash
+ dag_hash=dag_hash,
+ creating_job_id=self.id,
)
self._update_dag_next_dagruns(dag_models, session)
diff --git
a/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py
b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py
new file mode 100644
index 0000000..b4e1fae
--- /dev/null
+++ b/airflow/migrations/versions/364159666cbd_add_job_id_to_dagrun_table.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add creating_job_id to DagRun table
+
+Revision ID: 364159666cbd
+Revises: 849da589634d
+Create Date: 2020-10-10 09:08:07.332456
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '364159666cbd'
+down_revision = '849da589634d'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply Add creating_job_id to DagRun table"""
+ op.add_column('dag_run', sa.Column('creating_job_id', sa.Integer))
+
+
+def downgrade():
+ """Unapply Add job_id to DagRun table"""
+ op.drop_column('dag_run', 'creating_job_id')
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 45269f3..1d928d2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1655,7 +1655,8 @@ class DAG(BaseDag, LoggingMixin):
conf=None,
run_type=None,
session=None,
- dag_hash=None
+ dag_hash=None,
+ creating_job_id=None,
):
"""
Creates a dag run from this dag including the tasks associated with
this dag.
@@ -1675,6 +1676,8 @@ class DAG(BaseDag, LoggingMixin):
:type external_trigger: bool
:param conf: Dict containing configuration/parameters to pass to the
DAG
:type conf: dict
+ :param creating_job_id: id of the job creating this DagRun
+ :type creating_job_id: int
:param session: database session
:type session: sqlalchemy.orm.session.Session
:param dag_hash: Hash of Serialized DAG
@@ -1702,7 +1705,8 @@ class DAG(BaseDag, LoggingMixin):
conf=conf,
state=state,
run_type=run_type.value,
- dag_hash=dag_hash
+ dag_hash=dag_hash,
+ creating_job_id=creating_job_id
)
session.add(run)
session.flush()
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 39e2348..07d83c5 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -58,6 +58,7 @@ class DagRun(Base, LoggingMixin):
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
+ creating_job_id = Column(Integer)
external_trigger = Column(Boolean, default=True)
run_type = Column(String(50), nullable=False)
conf = Column(PickleType)
@@ -98,6 +99,7 @@ class DagRun(Base, LoggingMixin):
state: Optional[str] = None,
run_type: Optional[str] = None,
dag_hash: Optional[str] = None,
+ creating_job_id: Optional[int] = None,
):
self.dag_id = dag_id
self.run_id = run_id
@@ -108,6 +110,7 @@ class DagRun(Base, LoggingMixin):
self.state = state
self.run_type = run_type
self.dag_hash = dag_hash
+ self.creating_job_id = creating_job_id
super().__init__()
def __repr__(self):
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index ceb571c..3041ad0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1536,3 +1536,17 @@ class TestBackfillJob(unittest.TestCase):
ti2.refresh_from_db(session=session)
self.assertEqual(State.SCHEDULED, ti1.state)
self.assertEqual(State.NONE, ti2.state)
+
+ def test_job_id_is_assigned_to_dag_run(self):
+ dag_id = 'test_job_id_is_assigned_to_dag_run'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE,
schedule_interval='@daily')
+ DummyOperator(task_id="dummy_task", dag=dag)
+
+ job = BackfillJob(
+ dag=dag,
+ executor=MockExecutor(),
+ start_date=datetime.datetime.now() - datetime.timedelta(days=1)
+ )
+ job.run()
+ dr: DagRun = dag.get_last_dagrun()
+ assert dr.creating_job_id == job.id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 14e4a4f..2e2b56b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3488,6 +3488,33 @@ class TestSchedulerJob(unittest.TestCase):
full_filepath=dag.fileloc, dag_id=dag_id
)
+ def test_scheduler_sets_job_id_on_dag_run(self):
+ dag = DAG(
+ dag_id='test_scheduler_sets_job_id_on_dag_run',
+ start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ )
+
+ dagbag = DagBag(
+ dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+ include_examples=False,
+ read_dags_from_db=True
+ )
+ dagbag.bag_dag(dag=dag, root_dag=dag)
+ dagbag.sync_to_db()
+ dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+ scheduler = SchedulerJob(executor=self.null_exec)
+ scheduler.processor_agent = mock.MagicMock()
+
+ with create_session() as session:
+ scheduler._create_dag_runs([dag_model], session)
+
+ assert dag.get_last_dagrun().creating_job_id == scheduler.id
+
@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 3280922..afe42e3 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1348,6 +1348,14 @@ class TestDag(unittest.TestCase):
dr = dag.create_dagrun(run_id="custom_is_set_to_manual",
state=State.NONE)
assert dr.run_type == DagRunType.MANUAL.value
+ def test_create_dagrun_job_id_is_set(self):
+ job_id = 42
+ dag = DAG(dag_id="test_create_dagrun_job_id_is_set")
+ dr = dag.create_dagrun(
+ run_id="test_create_dagrun_job_id_is_set", state=State.NONE,
creating_job_id=job_id
+ )
+ assert dr.creating_job_id == job_id
+
@parameterized.expand(
[
(State.NONE,),