This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3ddb365 Duplicate Connection: Added logic to query if a connection id
exists before creating one (#18161)
3ddb365 is described below
commit 3ddb36578c5020408f89f5532b21dc0c38e739fb
Author: Kanthi <[email protected]>
AuthorDate: Sat Oct 9 10:06:11 2021 -0400
Duplicate Connection: Added logic to query if a connection id exists before
creating one (#18161)
---
airflow/www/views.py | 65 +++++++++++++++++++++-----------
tests/www/views/test_views_connection.py | 32 ++++++++++++++++
2 files changed, 75 insertions(+), 22 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2dca5fd..35cc590 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3413,34 +3413,55 @@ class ConnectionModelView(AirflowModelView):
for selected_conn in connections:
new_conn_id = selected_conn.conn_id
match = re.search(r"_copy(\d+)$", selected_conn.conn_id)
+
+ base_conn_id = selected_conn.conn_id
if match:
- conn_id_prefix = selected_conn.conn_id[: match.start()]
- new_conn_id = f"{conn_id_prefix}_copy{int(match.group(1)) + 1}"
- else:
- new_conn_id += '_copy1'
-
- dup_conn = Connection(
- new_conn_id,
- selected_conn.conn_type,
- selected_conn.description,
- selected_conn.host,
- selected_conn.login,
- selected_conn.password,
- selected_conn.schema,
- selected_conn.port,
- selected_conn.extra,
- )
+ base_conn_id = base_conn_id.split('_copy')[0]
+
+ potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in
range(1, 11)]
+ query =
session.query(Connection.conn_id).filter(Connection.conn_id.in_(potential_connection_ids))
+
+ found_conn_id_set = {conn_id for conn_id, in query}
+
+ possible_conn_id_iter = (
+ connection_id
+ for connection_id in potential_connection_ids
+ if connection_id not in found_conn_id_set
+ )
try:
- session.add(dup_conn)
- session.commit()
- flash(f"Connection {new_conn_id} added successfully.",
"success")
- except IntegrityError:
+ new_conn_id = next(possible_conn_id_iter)
+ except StopIteration:
flash(
- f"Connection {new_conn_id} can't be added. Integrity
error, probably unique constraint.",
+ f"Connection {new_conn_id} can't be added because it
already exists, "
+ f"Please rename the existing connections",
"warning",
)
- session.rollback()
+ else:
+
+ dup_conn = Connection(
+ new_conn_id,
+ selected_conn.conn_type,
+ selected_conn.description,
+ selected_conn.host,
+ selected_conn.login,
+ selected_conn.password,
+ selected_conn.schema,
+ selected_conn.port,
+ selected_conn.extra,
+ )
+
+ try:
+ session.add(dup_conn)
+ session.commit()
+ flash(f"Connection {new_conn_id} added successfully.",
"success")
+ except IntegrityError:
+ flash(
+ f"Connection {new_conn_id} can't be added. Integrity
error, "
+ f"probably unique constraint.",
+ "warning",
+ )
+ session.rollback()
self.update_redirect()
return redirect(self.get_redirect())
diff --git a/tests/www/views/test_views_connection.py
b/tests/www/views/test_views_connection.py
index 249bf2a..e729856 100644
--- a/tests/www/views/test_views_connection.py
+++ b/tests/www/views/test_views_connection.py
@@ -148,3 +148,35 @@ def test_duplicate_connection(admin_client):
response = {conn[0] for conn in session.query(Connection.conn_id).all()}
assert resp.status_code == 200
assert expected_result == response
+
+
+def test_duplicate_connection_error(admin_client):
+ """Test Duplicate multiple connection with suffix
+ when there are already 10 copies, no new copy
+ should be created"""
+
+ connection_ids = [f'test_duplicate_postgres_connection_copy{i}' for i in
range(1, 11)]
+ connections = [
+ Connection(
+ conn_id=connection_id,
+ conn_type='FTP',
+ description='Postgres',
+ host='localhost',
+ schema='airflow',
+ port=3306,
+ )
+ for connection_id in connection_ids
+ ]
+
+ with create_session() as session:
+ session.query(Connection).delete()
+ session.add_all(connections)
+
+ data = {"action": "mulduplicate", "rowid": [connections[0].id]}
+ resp = admin_client.post('/connection/action_post', data=data,
follow_redirects=True)
+
+ expected_result = {f'test_duplicate_postgres_connection_copy{i}' for i in
range(1, 11)}
+
+ assert resp.status_code == 200
+ response = {conn[0] for conn in session.query(Connection.conn_id).all()}
+ assert expected_result == response