This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new e367231  Add UniqueConnIdRule rule and unittest (#11222)
e367231 is described below

commit e367231ae91d5fe58b466bba4e1396a7f05983d4
Author: Eden Federman <[email protected]>
AuthorDate: Mon Oct 5 11:19:52 2020 +0300

    Add UniqueConnIdRule rule and unittest (#11222)
---
 airflow/upgrade/rules/conn_id_is_unique.py    | 45 +++++++++++++++++++++++++++
 tests/upgrade/rules/test_conn_id_is_unique.py | 43 +++++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/airflow/upgrade/rules/conn_id_is_unique.py 
b/airflow/upgrade/rules/conn_id_is_unique.py
new file mode 100644
index 0000000..8e1e474
--- /dev/null
+++ b/airflow/upgrade/rules/conn_id_is_unique.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from sqlalchemy import func
+from airflow.models import Connection
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.db import provide_session
+
+
+class UniqueConnIdRule(BaseRule):
+    title = "Connection.conn_id is not unique"
+
+    description = """\
+The `id` column in the `connection` table must be unique. Previously, this 
rule was \
+enforced by application logic, but was not enforced by the database schema.
+
+If you made any modifications to the table directly, make sure you don't have \
+duplicate values in conn_id column.
+    """
+
+    @provide_session
+    def check(self, session=None):
+        invalid_connections = session.query(Connection.conn_id)\
+            .group_by(Connection.conn_id)\
+            .having(func.count() > 1)
+        return (
+            'Connection.conn_id={} is not unique.'.format(conn_id)
+            for conn_id in invalid_connections
+        )
diff --git a/tests/upgrade/rules/test_conn_id_is_unique.py 
b/tests/upgrade/rules/test_conn_id_is_unique.py
new file mode 100644
index 0000000..22d9637
--- /dev/null
+++ b/tests/upgrade/rules/test_conn_id_is_unique.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+
+from airflow.models import Connection
+from airflow.upgrade.rules.conn_id_is_unique import UniqueConnIdRule
+from airflow.utils.db import create_session
+from tests.test_utils.db import clear_db_connections
+
+
+class TestUniqueConnIdRule(TestCase):
+    def tearDown(self):
+        clear_db_connections()
+
+    def test_check(self):
+        rule = UniqueConnIdRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        with create_session() as session:
+            conn1 = Connection(conn_id="UniqueConnIdRule")
+            conn2 = Connection(conn_id="UniqueConnIdRule")
+            session.merge(conn1)
+            session.merge(conn2)
+
+        msgs = rule.check(session=session)
+        assert [m for m in msgs if "UniqueConnIdRule" in m], \
+            "UniqueConnIdRule not in warning messages"

Reply via email to