This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new e367231 Add UniqueConnIdRule rule and unittest (#11222)
e367231 is described below
commit e367231ae91d5fe58b466bba4e1396a7f05983d4
Author: Eden Federman <[email protected]>
AuthorDate: Mon Oct 5 11:19:52 2020 +0300
Add UniqueConnIdRule rule and unittest (#11222)
---
airflow/upgrade/rules/conn_id_is_unique.py | 45 +++++++++++++++++++++++++++
tests/upgrade/rules/test_conn_id_is_unique.py | 43 +++++++++++++++++++++++++
2 files changed, 88 insertions(+)
diff --git a/airflow/upgrade/rules/conn_id_is_unique.py
b/airflow/upgrade/rules/conn_id_is_unique.py
new file mode 100644
index 0000000..8e1e474
--- /dev/null
+++ b/airflow/upgrade/rules/conn_id_is_unique.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from sqlalchemy import func
+from airflow.models import Connection
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.db import provide_session
+
+
+class UniqueConnIdRule(BaseRule):
+ title = "Connection.conn_id is not unique"
+
+ description = """\
+The `id` column in the `connection` table must be unique. Previously, this
rule was \
+enforced by application logic, but was not enforced by the database schema.
+
+If you made any modifications to the table directly, make sure you don't have \
+duplicate values in conn_id column.
+ """
+
+ @provide_session
+ def check(self, session=None):
+ invalid_connections = session.query(Connection.conn_id)\
+ .group_by(Connection.conn_id)\
+ .having(func.count() > 1)
+ return (
+ 'Connection.conn_id={} is not unique.'.format(conn_id)
+ for conn_id in invalid_connections
+ )
diff --git a/tests/upgrade/rules/test_conn_id_is_unique.py
b/tests/upgrade/rules/test_conn_id_is_unique.py
new file mode 100644
index 0000000..22d9637
--- /dev/null
+++ b/tests/upgrade/rules/test_conn_id_is_unique.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+
+from airflow.models import Connection
+from airflow.upgrade.rules.conn_id_is_unique import UniqueConnIdRule
+from airflow.utils.db import create_session
+from tests.test_utils.db import clear_db_connections
+
+
+class TestUniqueConnIdRule(TestCase):
+ def tearDown(self):
+ clear_db_connections()
+
+ def test_check(self):
+ rule = UniqueConnIdRule()
+
+ assert isinstance(rule.description, str)
+ assert isinstance(rule.title, str)
+
+ with create_session() as session:
+ conn1 = Connection(conn_id="UniqueConnIdRule")
+ conn2 = Connection(conn_id="UniqueConnIdRule")
+ session.merge(conn1)
+ session.merge(conn2)
+
+ msgs = rule.check(session=session)
+ assert [m for m in msgs if "UniqueConnIdRule" in m], \
+ "UniqueConnIdRule not in warning messages"