This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6630357fd1 Adding support for owner links in the Dags view UI (#25280)
6630357fd1 is described below

commit 6630357fd1b798945dc538552dd03e5031870fec
Author: Alex Kruchkov <[email protected]>
AuthorDate: Sat Aug 6 10:10:14 2022 +0300

    Adding support for owner links in the Dags view UI (#25280)
    
    * Adding support for owner links in the Dags view UI
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../0116_2_4_0_add_dag_owner_attributes_table.py   |  52 ++++++++++++++++
 airflow/models/__init__.py                         |   3 +-
 airflow/models/abstractoperator.py                 |   1 -
 airflow/models/dag.py                              |  69 ++++++++++++++++++++-
 airflow/serialization/schema.json                  |   1 +
 airflow/www/templates/airflow/dag_details.html     |  10 +++
 airflow/www/templates/airflow/dags.html            |   4 ++
 airflow/www/views.py                               |  21 ++++++-
 docs/apache-airflow/howto/add-owner-links.rst      |  49 +++++++++++++++
 docs/apache-airflow/howto/index.rst                |   1 +
 docs/apache-airflow/img/howto-owner-links.gif      | Bin 0 -> 829619 bytes
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 docs/spelling_wordlist.txt                         |   1 +
 tests/models/test_dag.py                           |  41 ++++++++++--
 tests/test_utils/db.py                             |   2 +
 tests/utils/test_db_cleanup.py                     |   1 +
 tests/www/views/test_views_base.py                 |   2 +-
 17 files changed, 250 insertions(+), 12 deletions(-)

diff --git 
a/airflow/migrations/versions/0116_2_4_0_add_dag_owner_attributes_table.py 
b/airflow/migrations/versions/0116_2_4_0_add_dag_owner_attributes_table.py
new file mode 100644
index 0000000000..85020350f2
--- /dev/null
+++ b/airflow/migrations/versions/0116_2_4_0_add_dag_owner_attributes_table.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add dag_owner_attributes table
+
+Revision ID: 1486deb605b4
+Revises: f4ff391becb5
+Create Date: 2022-08-04 16:59:45.406589
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '1486deb605b4'
+down_revision = 'f4ff391becb5'
+branch_labels = None
+depends_on = None
+airflow_version = '2.4.0'
+
+
+def upgrade():
+    """Apply Add ``DagOwnerAttributes`` table"""
+    op.create_table(
+        'dag_owner_attributes',
+        sa.Column('dag_id', sa.String(length=250), nullable=False),
+        sa.Column('owner', sa.String(length=500), nullable=False),
+        sa.Column('link', sa.String(length=500), nullable=False),
+        sa.ForeignKeyConstraint(['dag_id'], ['dag.dag_id'], 
ondelete='CASCADE'),
+        sa.PrimaryKeyConstraint('dag_id', 'owner'),
+    )
+
+
+def downgrade():
+    """Unapply Add Dataset model"""
+    op.drop_table('dag_owner_attributes')
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 2a12cbba35..84b3399334 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -21,7 +21,7 @@ from typing import Union
 from airflow.models.base import ID_LEN, Base
 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.models.connection import Connection
-from airflow.models.dag import DAG, DagModel, DagTag
+from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag
 from airflow.models.dagbag import DagBag
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import DagRun
@@ -58,6 +58,7 @@ __all__ = [
     "DagPickle",
     "DagRun",
     "DagTag",
+    "DagOwnerAttributes",
     "Dataset",
     "DbCallbackRequest",
     "ImportError",
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 50e234def7..bae9322ef7 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -15,7 +15,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import inspect
 from typing import (
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f85a45b200..664e680f1a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import collections
 import copy
 import functools
 import itertools
@@ -38,6 +38,7 @@ from typing import (
     Dict,
     FrozenSet,
     Iterable,
+    Iterator,
     List,
     NamedTuple,
     Optional,
@@ -49,6 +50,7 @@ from typing import (
     cast,
     overload,
 )
+from urllib.parse import urlsplit
 
 import jinja2
 import pendulum
@@ -312,6 +314,9 @@ class DAG(LoggingMixin):
         ``Environment`` is used to render templates as string values.
     :param tags: List of tags to help filtering DAGs in the UI.
     :param schedule_on: List of upstream datasets if for use in triggering DAG 
runs.
+    :param owner_links: Dict of owners and their links, that will be clickable 
on the DAGs view UI.
+        Can be used as an HTTP link (for example the link to your Slack 
channel), or a mailto link.
+        e.g: {"dag_owner": "https://airflow.apache.org/"}
     """
 
     _comps = {
@@ -372,12 +377,14 @@ class DAG(LoggingMixin):
         render_template_as_native_obj: bool = False,
         tags: Optional[List[str]] = None,
         schedule_on: Optional[Sequence["Dataset"]] = None,
+        owner_links: Optional[Dict[str, str]] = None,
     ):
         from airflow.utils.task_group import TaskGroup
 
         if tags and any(len(tag) > TAG_MAX_LEN for tag in tags):
             raise AirflowException(f"tag cannot be longer than {TAG_MAX_LEN} 
characters")
 
+        self.owner_links = owner_links if owner_links else {}
         self.user_defined_macros = user_defined_macros
         self.user_defined_filters = user_defined_filters
         if default_args and not isinstance(default_args, dict):
@@ -533,6 +540,12 @@ class DAG(LoggingMixin):
         self.tags = tags or []
         self._task_group = TaskGroup.create_root(self)
         self.validate_schedule_and_params()
+        wrong_links = dict(self.iter_invalid_owner_links())
+        if wrong_links:
+            raise AirflowException(
+                "Wrong link format was used for the owner. Use a valid link \n"
+                f"Bad formatted links are: {wrong_links}"
+            )
 
     def get_doc_md(self, doc_md: Optional[str]) -> Optional[str]:
         if doc_md is None:
@@ -2586,6 +2599,14 @@ class DAG(LoggingMixin):
                     orm_dag.tags.append(dag_tag_orm)
                     session.add(dag_tag_orm)
 
+            orm_dag_links = orm_dag.dag_owner_links or []
+            for orm_dag_link in orm_dag_links:
+                if orm_dag_link not in dag.owner_links:
+                    session.delete(orm_dag_link)
+            for owner_name, owner_link in dag.owner_links.items():
+                dag_owner_orm = DagOwnerAttributes(dag_id=dag.dag_id, 
owner=owner_name, link=owner_link)
+                session.add(dag_owner_orm)
+
         DagCode.bulk_sync_to_db(filelocs, session=session)
 
         from airflow.models.dataset import Dataset, DatasetDagRef, 
DatasetTaskRef
@@ -2810,6 +2831,19 @@ class DAG(LoggingMixin):
                     "DAG Schedule must be None, if there are any required 
params without default values"
                 )
 
+    def iter_invalid_owner_links(self) -> Iterator[Tuple[str, str]]:
+        """Parses a given link, and verifies if it's a valid URL, or a 
'mailto' link.
+        Returns an iterator of invalid (owner, link) pairs.
+        """
+        for owner, link in self.owner_links.items():
+            result = urlsplit(link)
+            if result.scheme == "mailto":
+                # netloc is not existing for 'mailto' link, so we are checking 
that the path is parsed
+                if not result.path:
+                    yield result.path, link
+            elif not result.scheme or not result.netloc:
+                yield owner, link
+
 
 class DagTag(Base):
     """A tag name per dag, to allow quick filtering in the DAG view."""
@@ -2826,6 +2860,33 @@ class DagTag(Base):
         return self.name
 
 
+class DagOwnerAttributes(Base):
+    """
+    Table defining different owner attributes. For example, a link for an 
owner that will be passed as
+    a hyperlink to the DAGs view
+    """
+
+    __tablename__ = "dag_owner_attributes"
+    dag_id = Column(
+        StringID(),
+        ForeignKey('dag.dag_id', name='dag.dag_id', ondelete='CASCADE'),
+        nullable=False,
+        primary_key=True,
+    )
+    owner = Column(String(500), primary_key=True, nullable=False)
+    link = Column(String(500), nullable=False)
+
+    def __repr__(self):
+        return f"<DagOwnerAttributes: dag_id={self.dag_id}, 
owner={self.owner}, link={self.link}>"
+
+    @classmethod
+    def get_all(cls, session) -> Dict[str, Dict[str, str]]:
+        dag_links: dict = collections.defaultdict(dict)
+        for obj in session.query(cls):
+            dag_links[obj.dag_id].update({obj.owner: obj.link})
+        return dag_links
+
+
 class DagModel(Base):
     """Table containing DAG properties"""
 
@@ -2871,6 +2932,10 @@ class DagModel(Base):
     timetable_description = Column(String(1000), nullable=True)
     # Tags for view filter
     tags = relationship('DagTag', cascade='all, delete, delete-orphan', 
backref=backref('dag'))
+    # Dag owner links for DAGs view
+    dag_owner_links = relationship(
+        'DagOwnerAttributes', cascade='all, delete, delete-orphan', 
backref=backref('dag')
+    )
 
     max_active_tasks = Column(Integer, nullable=False)
     max_active_runs = Column(Integer, nullable=True)
@@ -3157,6 +3222,7 @@ def dag(
     render_template_as_native_obj: bool = False,
     tags: Optional[List[str]] = None,
     schedule_on: Optional[Sequence["Dataset"]] = None,
+    owner_links: Optional[Dict[str, str]] = None,
 ) -> Callable[[Callable], Callable[..., DAG]]:
     """
     Python dag decorator. Wraps a function into an Airflow DAG.
@@ -3208,6 +3274,7 @@ def dag(
                 render_template_as_native_obj=render_template_as_native_obj,
                 tags=tags,
                 schedule_on=schedule_on,
+                owner_links=owner_links,
             ) as dag_obj:
                 # Set DAG documentation from function documentation.
                 if f.__doc__:
diff --git a/airflow/serialization/schema.json 
b/airflow/serialization/schema.json
index f9df99eb58..80d579f567 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -127,6 +127,7 @@
                 }
             ]
         },
+        "owner_links": { "type": "object" },
         "timetable": {
           "type": "object",
           "properties": {
diff --git a/airflow/www/templates/airflow/dag_details.html 
b/airflow/www/templates/airflow/dag_details.html
index e721878ab3..986a7d0e58 100644
--- a/airflow/www/templates/airflow/dag_details.html
+++ b/airflow/www/templates/airflow/dag_details.html
@@ -95,6 +95,16 @@
       <th>Owner</th>
       <td>{{ dag.owner }}</td>
     </tr>
+    <tr>
+      <th>Owner Links</th>
+      <td>
+        {% if dag.owner_links %}
+          {{ dag.owner_links }}
+        {% else %}
+          None
+        {% endif %}
+      </td>
+    </tr>
     <tr>
       <th>DAG Run Timeout</th>
       <td>{{ dag.dagrun_timeout }}</td>
diff --git a/airflow/www/templates/airflow/dags.html 
b/airflow/www/templates/airflow/dags.html
index 32ed7e9f8a..92132ab517 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -257,7 +257,11 @@
             <td>
               {% for owner in dag.owners.split(",") %}
                 <a class="label label-default"
+                 {% if owner_links and owner.strip() in 
owner_links.get(dag.dag_id, {}) %}
+                   href="{{ owner_links[dag.dag_id].get(owner.strip()) }}" 
target="_blank"
+                 {% else %}
                    href="?search={{ owner | trim }}"
+                 {% endif %}
                    style="margin: 3px 6px 3px 0;">
                   {{ owner | trim }}
                 </a>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ea401ac914..4e3a7b9e36 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -105,7 +105,17 @@ from airflow.executors.executor_loader import 
ExecutorLoader
 from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.jobs.triggerer_job import TriggererJob
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, 
TaskFail, XCom, errors
+from airflow.models import (
+    Connection,
+    DagModel,
+    DagOwnerAttributes,
+    DagTag,
+    Log,
+    SlaMiss,
+    TaskFail,
+    XCom,
+    errors,
+)
 from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dag import DAG, get_dataset_triggered_next_run_info
 from airflow.models.dagcode import DagCode
@@ -882,6 +892,8 @@ class Airflow(AirflowBaseView):
                 for name, in dagtags
             ]
 
+            owner_links_dict = DagOwnerAttributes.get_all(session)
+
             import_errors = 
session.query(errors.ImportError).order_by(errors.ImportError.id)
 
             if (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) not in 
user_permissions:
@@ -973,6 +985,7 @@ class Airflow(AirflowBaseView):
             ),
             num_runs=num_runs,
             tags=tags,
+            owner_links=owner_links_dict,
             state_color=state_color_mapping,
             status_filter=arg_status_filter,
             status_count_all=all_dags_count,
@@ -1307,6 +1320,10 @@ class Airflow(AirflowBaseView):
 
         tags = session.query(models.DagTag).filter(models.DagTag.dag_id == 
dag_id).all()
 
+        owner_links = (
+            
session.query(models.DagOwnerAttributes).filter(models.DagOwnerAttributes.dag_id
 == dag_id).all()
+        )
+
         attrs_to_avoid = [
             "NUM_DAGS_PER_DAGRUN_QUERY",
             "serialized_dag",
@@ -1319,6 +1336,7 @@ class Airflow(AirflowBaseView):
             "max_active_tasks",
             "schedule_interval",
             "owners",
+            "dag_owner_links",
             "is_paused",
         ]
         attrs_to_avoid.extend(wwwutils.get_attr_renderer().keys())
@@ -1342,6 +1360,7 @@ class Airflow(AirflowBaseView):
             State=State,
             active_runs=active_runs,
             tags=tags,
+            owner_links=owner_links,
             dag_model_attrs=dag_model_attrs,
         )
 
diff --git a/docs/apache-airflow/howto/add-owner-links.rst 
b/docs/apache-airflow/howto/add-owner-links.rst
new file mode 100644
index 0000000000..0dd67169db
--- /dev/null
+++ b/docs/apache-airflow/howto/add-owner-links.rst
@@ -0,0 +1,49 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Add Owner Links to DAG
+=======================
+
+.. versionadded:: 2.4.0
+
+You can pass the ``owner_link`` parameter for your DAG object, which will make 
the owner to become a clickable link
+in the main DAGs view page.
+You can use it to set a custom HTTP link (for example, the owner's Slack 
channel), or use a
+`maitlo <https://en.wikipedia.org/wiki/Mailto>`_ link to have an automated 
email message (up to 500 characters).
+
+Example:
+In your DAG file, add a ``owners_link`` parameter to the DAG object that will 
hold a dictionary of an owner and it's link.
+After that, define a task that will use this owner, and the link in the DAGs 
view will become clickable
+
+.. code-block:: python
+
+  dag = DAG(
+      dag_id="example_dag_owners",
+      schedule_interval="0 0 * * *",
+      start_date=datetime(2022, 8, 5),
+      owner_links={"airflow": "https://airflow.apache.org/"},
+  )
+
+  with dag:
+      bash_task = BashOperator(task_id='task_using_linked_owner', 
bash_command='echo 1', owner='airflow')
+
+**Screenshot**:
+
+.. image:: ../img/howto-owner-links.gif
diff --git a/docs/apache-airflow/howto/index.rst 
b/docs/apache-airflow/howto/index.rst
index d2717ce8cb..6b8a638915 100644
--- a/docs/apache-airflow/howto/index.rst
+++ b/docs/apache-airflow/howto/index.rst
@@ -30,6 +30,7 @@ configuring an Airflow environment.
     :maxdepth: 2
 
     add-dag-tags
+    add-owner-links
     set-config
     set-up-database
     operator/index
diff --git a/docs/apache-airflow/img/howto-owner-links.gif 
b/docs/apache-airflow/img/howto-owner-links.gif
new file mode 100644
index 0000000000..46b687341a
Binary files /dev/null and b/docs/apache-airflow/img/howto-owner-links.gif 
differ
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index a182eeec2d..28b5928738 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``f4ff391becb5`` (head)         | ``0038cd0c28b4``  | ``2.4.0``         | 
Remove smart sensors                                         |
+| ``1486deb605b4`` (head)         | ``f4ff391becb5``  | ``2.4.0``         | 
add dag_owner_attributes table                               |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``f4ff391becb5``                | ``0038cd0c28b4``  | ``2.4.0``         | 
Remove smart sensors                                         |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``0038cd0c28b4``                | ``44b7034f6bdc``  | ``2.4.0``         | 
Add Dataset model                                            |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c54e7ee3fd..e4d2c98602 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1052,6 +1052,7 @@ lshift
 lxml
 macOS
 macosx
+mailto
 makedirs
 makedsn
 mapred
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 25cfa15e04..865e31c670 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -46,7 +46,7 @@ from airflow.decorators import task as task_decorator
 from airflow.exceptions import AirflowException, DuplicateTaskIdFound, 
ParamValidationError
 from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, 
TaskInstance as TI
 from airflow.models.baseoperator import BaseOperator
-from airflow.models.dag import dag as dag_decorator, 
get_dataset_triggered_next_run_info
+from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, 
get_dataset_triggered_next_run_info
 from airflow.models.dataset import Dataset, DatasetDagRunQueue, DatasetTaskRef
 from airflow.models.param import DagParam, Param, ParamsDict
 from airflow.operators.bash import BashOperator
@@ -700,14 +700,14 @@ class TestDag(unittest.TestCase):
                 assert row[0] is not None
 
         # Re-sync should do fewer queries
-        with assert_queries_count(4):
+        with assert_queries_count(8):
             DAG.bulk_write_to_db(dags)
-        with assert_queries_count(4):
+        with assert_queries_count(8):
             DAG.bulk_write_to_db(dags)
         # Adding tags
         for dag in dags:
             dag.tags.append("test-dag2")
-        with assert_queries_count(5):
+        with assert_queries_count(9):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -726,7 +726,7 @@ class TestDag(unittest.TestCase):
         # Removing tags
         for dag in dags:
             dag.tags.remove("test-dag")
-        with assert_queries_count(5):
+        with assert_queries_count(9):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -745,7 +745,7 @@ class TestDag(unittest.TestCase):
         # Removing all tags
         for dag in dags:
             dag.tags = None
-        with assert_queries_count(5):
+        with assert_queries_count(9):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 
'dag-bulk-sync-3'} == {
@@ -1991,6 +1991,35 @@ class TestDag(unittest.TestCase):
             start_date + 2 * delta,
         ]
 
+    def test_dag_owner_links(self):
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            owner_links={"owner1": "https://mylink.com";, "owner2": 
"mailto:[email protected]"},
+        )
+
+        assert dag.owner_links == {"owner1": "https://mylink.com";, "owner2": 
"mailto:[email protected]"}
+        session = settings.Session()
+        dag.sync_to_db(session=session)
+
+        expected_owners = {'dag': {'owner1': 'https://mylink.com', 'owner2': 
'mailto:[email protected]'}}
+        orm_dag_owners = DagOwnerAttributes.get_all(session)
+        assert orm_dag_owners == expected_owners
+
+        # Test dag owner links are removed completely
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+        )
+        dag.sync_to_db(session=session)
+
+        orm_dag_owners = session.query(DagOwnerAttributes).all()
+        assert not orm_dag_owners
+
+        # Check wrong formatted owner link
+        with pytest.raises(AirflowException):
+            DAG('dag', start_date=DEFAULT_DATE, owner_links={"owner1": 
"my-bad-link"})
+
 
 class TestDagModel:
     def test_dags_needing_dagruns_not_too_early(self):
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index 6c78c87c3a..2f49604262 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -20,6 +20,7 @@ from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import (
     Connection,
     DagModel,
+    DagOwnerAttributes,
     DagRun,
     DagTag,
     DagWarning,
@@ -65,6 +66,7 @@ def clear_db_datasets():
 def clear_db_dags():
     with create_session() as session:
         session.query(DagTag).delete()
+        session.query(DagOwnerAttributes).delete()
         session.query(DagModel).delete()
 
 
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index a7b48ff7e9..2439bd9703 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -258,6 +258,7 @@ class TestDBCleanup:
             'serialized_dag',  # handled through FK to Dag
             'log_template',  # not a significant source of data; age not 
indicative of staleness
             'dag_tag',  # not a significant source of data; age not indicative 
of staleness,
+            'dag_owner_attributes',  # not a significant source of data; age 
not indicative of staleness,
             'dag_pickle',  # unsure of consequences
             'dag_code',  # self-maintaining
             'dag_warning',  # self-maintaining
diff --git a/tests/www/views/test_views_base.py 
b/tests/www/views/test_views_base.py
index 3fd2a5ff3c..31ccf74286 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -30,7 +30,7 @@ from tests.test_utils.www import check_content_in_response, 
check_content_not_in
 
 
 def test_index(admin_client):
-    with assert_queries_count(15):
+    with assert_queries_count(16):
         resp = admin_client.get('/', follow_redirects=True)
     check_content_in_response('DAGs', resp)
 

Reply via email to