This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 45e61a9 Only show import errors for DAGs a user can access (#17835)
45e61a9 is described below
commit 45e61a965f64feffb18f6e064810a93b61a48c8a
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Aug 25 16:50:40 2021 -0600
Only show import errors for DAGs a user can access (#17835)
For new DAGs (ones that have not previously parsed successfully), import
errors will only be shown to users who can read all DAGs.
Closes: #17684
---
airflow/www/views.py | 12 ++-
tests/www/views/test_views_base.py | 50 -----------
tests/www/views/test_views_home.py | 180 +++++++++++++++++++++++++++++++++++++
3 files changed, 190 insertions(+), 52 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5a45bda..ceb035f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -694,8 +694,16 @@ class Airflow(AirflowBaseView):
import_errors =
session.query(errors.ImportError).order_by(errors.ImportError.id).all()
- for import_error in import_errors:
- flash("Broken DAG: [{ie.filename}]
{ie.stacktrace}".format(ie=import_error), "dag_import_error")
+ if import_errors:
+ dag_filenames = {dag.fileloc for dag in dags}
+ all_dags_readable = (permissions.ACTION_CAN_READ,
permissions.RESOURCE_DAG) in user_permissions
+
+ for import_error in import_errors:
+ if all_dags_readable or import_error.filename in dag_filenames:
+ flash(
+ "Broken DAG: [{ie.filename}]
{ie.stacktrace}".format(ie=import_error),
+ "dag_import_error",
+ )
from airflow.plugins_manager import import_errors as
plugin_import_errors
diff --git a/tests/www/views/test_views_base.py
b/tests/www/views/test_views_base.py
index 95b4319..1581588 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -18,15 +18,12 @@
import datetime
import json
-import flask
import pytest
from airflow import version
from airflow.jobs.base_job import BaseJob
from airflow.utils import timezone
from airflow.utils.session import create_session
-from airflow.utils.state import State
-from airflow.www.views import FILTER_STATUS_COOKIE, FILTER_TAGS_COOKIE
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.config import conf_vars
from tests.test_utils.www import check_content_in_response,
check_content_not_in_response
@@ -51,29 +48,6 @@ def test_doc_urls(admin_client):
check_content_in_response("/api/v1/ui", resp)
-def test_home(capture_templates, admin_client):
- with capture_templates() as templates:
- resp = admin_client.get('home', follow_redirects=True)
- check_content_in_response('DAGs', resp)
- val_state_color_mapping = (
- 'const STATE_COLOR = {'
- '"deferred": "lightseagreen", "failed": "red", '
- '"null": "lightblue", "queued": "gray", '
- '"removed": "lightgrey", "restarting": "violet", "running":
"lime", '
- '"scheduled": "tan", "sensing": "lightseagreen", '
- '"shutdown": "blue", "skipped": "pink", '
- '"success": "green", "up_for_reschedule": "turquoise", '
- '"up_for_retry": "gold", "upstream_failed": "orange"};'
- )
- check_content_in_response(val_state_color_mapping, resp)
-
- assert len(templates) == 1
- assert templates[0].name == 'airflow/dags.html'
- state_color_mapping = State.state_color.copy()
- state_color_mapping["null"] = state_color_mapping.pop(None)
- assert templates[0].local_context['state_color'] == state_color_mapping
-
-
@pytest.fixture()
def heartbeat_healthy():
# case-1: healthy scheduler status
@@ -395,30 +369,6 @@ def test_delete_user(app, admin_client, exist_username):
check_content_in_response("Deleted Row", resp)
-def test_home_filter_tags(admin_client):
- with admin_client:
- admin_client.get('home?tags=example&tags=data', follow_redirects=True)
- assert 'example,data' == flask.session[FILTER_TAGS_COOKIE]
-
- admin_client.get('home?reset_tags', follow_redirects=True)
- assert flask.session[FILTER_TAGS_COOKIE] is None
-
-
-def test_home_status_filter_cookie(admin_client):
- with admin_client:
- admin_client.get('home', follow_redirects=True)
- assert 'all' == flask.session[FILTER_STATUS_COOKIE]
-
- admin_client.get('home?status=active', follow_redirects=True)
- assert 'active' == flask.session[FILTER_STATUS_COOKIE]
-
- admin_client.get('home?status=paused', follow_redirects=True)
- assert 'paused' == flask.session[FILTER_STATUS_COOKIE]
-
- admin_client.get('home?status=all', follow_redirects=True)
- assert 'all' == flask.session[FILTER_STATUS_COOKIE]
-
-
@conf_vars({("webserver", "show_recent_stats_for_completed_runs"): "False"})
def test_task_stats_only_noncompleted(admin_client):
resp = admin_client.post('task_stats', follow_redirects=True)
diff --git a/tests/www/views/test_views_home.py
b/tests/www/views/test_views_home.py
new file mode 100644
index 0000000..0b85b22
--- /dev/null
+++ b/tests/www/views/test_views_home.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import flask
+import pytest
+
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.security import permissions
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.www.views import FILTER_STATUS_COOKIE, FILTER_TAGS_COOKIE
+from tests.test_utils.api_connexion_utils import create_user
+from tests.test_utils.db import clear_db_dags, clear_db_import_errors,
clear_db_serialized_dags
+from tests.test_utils.www import check_content_in_response,
check_content_not_in_response, client_with_login
+
+
+def clean_db():
+ clear_db_dags()
+ clear_db_import_errors()
+ clear_db_serialized_dags()
+
+
[email protected](autouse=True)
+def setup():
+ clean_db()
+ yield
+ clean_db()
+
+
+def test_home(capture_templates, admin_client):
+ with capture_templates() as templates:
+ resp = admin_client.get('home', follow_redirects=True)
+ check_content_in_response('DAGs', resp)
+ val_state_color_mapping = (
+ 'const STATE_COLOR = {'
+ '"deferred": "lightseagreen", "failed": "red", '
+ '"null": "lightblue", "queued": "gray", '
+ '"removed": "lightgrey", "restarting": "violet", "running":
"lime", '
+ '"scheduled": "tan", "sensing": "lightseagreen", '
+ '"shutdown": "blue", "skipped": "pink", '
+ '"success": "green", "up_for_reschedule": "turquoise", '
+ '"up_for_retry": "gold", "upstream_failed": "orange"};'
+ )
+ check_content_in_response(val_state_color_mapping, resp)
+
+ assert len(templates) == 1
+ assert templates[0].name == 'airflow/dags.html'
+ state_color_mapping = State.state_color.copy()
+ state_color_mapping["null"] = state_color_mapping.pop(None)
+ assert templates[0].local_context['state_color'] == state_color_mapping
+
+
+def test_home_filter_tags(admin_client):
+ with admin_client:
+ admin_client.get('home?tags=example&tags=data', follow_redirects=True)
+ assert 'example,data' == flask.session[FILTER_TAGS_COOKIE]
+
+ admin_client.get('home?reset_tags', follow_redirects=True)
+ assert flask.session[FILTER_TAGS_COOKIE] is None
+
+
+def test_home_status_filter_cookie(admin_client):
+ with admin_client:
+ admin_client.get('home', follow_redirects=True)
+ assert 'all' == flask.session[FILTER_STATUS_COOKIE]
+
+ admin_client.get('home?status=active', follow_redirects=True)
+ assert 'active' == flask.session[FILTER_STATUS_COOKIE]
+
+ admin_client.get('home?status=paused', follow_redirects=True)
+ assert 'paused' == flask.session[FILTER_STATUS_COOKIE]
+
+ admin_client.get('home?status=all', follow_redirects=True)
+ assert 'all' == flask.session[FILTER_STATUS_COOKIE]
+
+
[email protected](scope="module")
+def user_single_dag(app):
+ """Create User that can only access the first DAG from
TEST_FILTER_DAG_IDS"""
+ return create_user(
+ app,
+ username="user_single_dag",
+ role_name="role_single_dag",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+ (permissions.ACTION_CAN_READ,
permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])),
+ ],
+ )
+
+
[email protected]()
+def client_single_dag(app, user_single_dag):
+ """Client for User that can only access the first DAG from
TEST_FILTER_DAG_IDS"""
+ return client_with_login(
+ app,
+ username="user_single_dag",
+ password="user_single_dag",
+ )
+
+
+TEST_FILTER_DAG_IDS = ['filter_test_1', 'filter_test_2']
+
+
+def _process_file(file_path, session):
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ dag_file_processor.process_file(file_path, [], False, session)
+
+
[email protected]()
+def working_dags(tmpdir):
+ dag_contents_template = "from airflow import DAG\ndag = DAG('{}')"
+
+ with create_session() as session:
+ for dag_id in TEST_FILTER_DAG_IDS:
+ filename = tmpdir / f"{dag_id}.py"
+ with open(filename, "w") as f:
+ f.writelines(dag_contents_template.format(dag_id))
+ _process_file(filename, session)
+
+
[email protected]()
+def broken_dags(tmpdir, working_dags):
+ with create_session() as session:
+ for dag_id in TEST_FILTER_DAG_IDS:
+ filename = tmpdir / f"{dag_id}.py"
+ with open(filename, "w") as f:
+ f.writelines('airflow DAG')
+ _process_file(filename, session)
+
+
+def test_home_importerrors(broken_dags, user_client):
+ # Users with "can read on DAGs" gets all DAG import errors
+ resp = user_client.get('home', follow_redirects=True)
+ check_content_in_response("Import Errors", resp)
+ for dag_id in TEST_FILTER_DAG_IDS:
+ check_content_in_response(f"/{dag_id}.py", resp)
+
+
+def test_home_importerrors_filtered_singledag_user(broken_dags,
client_single_dag):
+ # Users that can only see certain DAGs get a filtered list of import errors
+ resp = client_single_dag.get('home', follow_redirects=True)
+ check_content_in_response("Import Errors", resp)
+ # They can see the first DAGs import error
+ check_content_in_response(f"/{TEST_FILTER_DAG_IDS[0]}.py", resp)
+ # But not the rest
+ for dag_id in TEST_FILTER_DAG_IDS[1:]:
+ check_content_not_in_response(f"/{dag_id}.py", resp)
+
+
+def test_home_dag_list(working_dags, user_client):
+ # Users with "can read on DAGs" gets all DAGs
+ resp = user_client.get('home', follow_redirects=True)
+ for dag_id in TEST_FILTER_DAG_IDS:
+ check_content_in_response(f"dag_id={dag_id}", resp)
+
+
+def test_home_dag_list_filtered_singledag_user(working_dags,
client_single_dag):
+ # Users that can only see certain DAGs get a filtered list
+ resp = client_single_dag.get('home', follow_redirects=True)
+ # They can see the first DAG
+ check_content_in_response(f"dag_id={TEST_FILTER_DAG_IDS[0]}", resp)
+ # But not the rest
+ for dag_id in TEST_FILTER_DAG_IDS[1:]:
+ check_content_not_in_response(f"dag_id={dag_id}", resp)