This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7ab6210 Prepend `DAG:` to dag permissions (#11189)
7ab6210 is described below
commit 7ab62100af8a59694721e06b213a933869c6a1ed
Author: James Timmins <[email protected]>
AuthorDate: Thu Oct 15 16:32:38 2020 -0700
Prepend `DAG:` to dag permissions (#11189)
This adds the prefix DAG: to newly created dag permissions. It supports
checking permissions on both prefixed and un-prefixed DAG permission names.
This will make it easier to identify permissions that related to granular
dag access.
This PR does not modify existing dag permission names to use the new
prefixed naming scheme. That will come in a separate PR.
Related to issue #10469
---
UPDATING.md | 10 +
airflow/api_connexion/endpoints/dag_endpoint.py | 9 +-
.../api_connexion/endpoints/dag_run_endpoint.py | 11 +-
.../api_connexion/endpoints/extra_link_endpoint.py | 3 +-
airflow/api_connexion/endpoints/log_endpoint.py | 5 +-
airflow/api_connexion/endpoints/task_endpoint.py | 5 +-
.../endpoints/task_instance_endpoint.py | 17 +-
airflow/api_connexion/endpoints/xcom_endpoint.py | 15 +-
airflow/api_connexion/security.py | 6 +-
.../849da589634d_prefix_dag_permissions.py | 115 +++++++++++
airflow/models/dag.py | 33 +++-
airflow/security/permissions.py | 28 +++
airflow/www/app.py | 1 +
airflow/www/decorators.py | 50 +++--
airflow/www/security.py | 166 ++++++++++------
airflow/www/views.py | 13 +-
tests/api_connexion/endpoints/test_dag_endpoint.py | 9 +-
.../endpoints/test_dag_run_endpoint.py | 3 +-
.../endpoints/test_extra_link_endpoint.py | 3 +-
tests/api_connexion/endpoints/test_log_endpoint.py | 7 +-
.../api_connexion/endpoints/test_task_endpoint.py | 7 +-
.../endpoints/test_task_instance_endpoint.py | 3 +-
.../api_connexion/endpoints/test_xcom_endpoint.py | 3 +-
tests/cli/commands/test_sync_perm_command.py | 4 +-
tests/models/test_dag.py | 13 ++
tests/serialization/test_dag_serialization.py | 6 +-
tests/test_utils/fab_utils.py | 80 ++++++++
tests/www/test_security.py | 220 +++++++++++----------
tests/www/test_views.py | 57 +++---
29 files changed, 643 insertions(+), 259 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 230a2a6..75a1d18 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -50,6 +50,16 @@ assists users migrating to a new version.
## Airflow Master
+### Change to Permissions
+
+The DAG-level permission actions, `can_dag_read` and `can_dag_edit` are going
away. They are being replaced with `can_read` and `can_edit`. When a role is
given DAG-level access, the resource name (or "view menu", in Flask App-Builder
parlance) will now be prefixed with `DAG:`. So the action `can_dag_read` on
`example_dag_id`, is now represented as `can_read` on `DAG:example_dag_id`.
+
+*As part of running `db upgrade`, existing permissions will be migrated for
you.*
+
+When DAGs are initialized with the `access_control` variable set, any usage of
the old permission names will automatically be updated in the database, so this
won't be a breaking change. A DeprecationWarning will be raised.
+
+### Changes to Airflow Plugins
+
If you are using Airflow Plugins and were passing `admin_views` & `menu_links`
which were used in the
non-RBAC UI (`flask-admin` based UI), upto it to use `flask_appbuilder_views`
and `flask_appbuilder_menu_links`.
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py
b/airflow/api_connexion/endpoints/dag_endpoint.py
index 7c32a43..33c3643 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -28,10 +28,11 @@ from airflow.api_connexion.schemas.dag_schema import (
dags_collection_schema,
)
from airflow.models.dag import DagModel
+from airflow.security import permissions
from airflow.utils.session import provide_session
[email protected]_access([("can_read", "Dag")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS)])
@provide_session
def get_dag(dag_id, session):
"""
@@ -45,7 +46,7 @@ def get_dag(dag_id, session):
return dag_schema.dump(dag)
[email protected]_access([("can_read", "Dag")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS)])
def get_dag_details(dag_id):
"""
Get details of DAG.
@@ -56,7 +57,7 @@ def get_dag_details(dag_id):
return dag_detail_schema.dump(dag)
[email protected]_access([("can_read", "Dag")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS)])
@format_parameters({'limit': check_limit})
def get_dags(limit, offset=0):
"""
@@ -69,7 +70,7 @@ def get_dags(limit, offset=0):
return dags_collection_schema.dump(DAGCollection(dags=dags,
total_entries=total_entries))
[email protected]_access([("can_edit", "Dag")])
[email protected]_access([("can_edit", permissions.RESOURCE_DAGS)])
@provide_session
def patch_dag(session, dag_id, update_mask=None):
"""
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 8f2a3e0..2d3a297 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -28,11 +28,12 @@ from airflow.api_connexion.schemas.dag_run_schema import (
dagruns_batch_form_schema,
)
from airflow.models import DagModel, DagRun
+from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
[email protected]_access([("can_read", "Dag"), ("can_delete", "DagRun")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_delete", "DagRun")])
@provide_session
def delete_dag_run(dag_id, dag_run_id, session):
"""
@@ -43,7 +44,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
return NoContent, 204
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun")])
@provide_session
def get_dag_run(dag_id, dag_run_id, session):
"""
@@ -58,7 +59,7 @@ def get_dag_run(dag_id, dag_run_id, session):
return dagrun_schema.dump(dag_run)
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun")])
@format_parameters(
{
'start_date_gte': format_datetime,
@@ -157,7 +158,7 @@ def _apply_date_filters_to_query(
return query
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun")])
@provide_session
def get_dag_runs_batch(session):
"""
@@ -193,7 +194,7 @@ def get_dag_runs_batch(session):
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs,
total_entries=total_entries))
[email protected]_access([("can_read", "Dag"), ("can_create", "DagRun")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_create", "DagRun")])
@provide_session
def post_dag_run(dag_id, session):
"""
diff --git a/airflow/api_connexion/endpoints/extra_link_endpoint.py
b/airflow/api_connexion/endpoints/extra_link_endpoint.py
index e831d1b..4ec0554 100644
--- a/airflow/api_connexion/endpoints/extra_link_endpoint.py
+++ b/airflow/api_connexion/endpoints/extra_link_endpoint.py
@@ -23,12 +23,13 @@ from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import TaskNotFound
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
+from airflow.security import permissions
from airflow.utils.session import provide_session
@security.requires_access(
[
- ('can_read', 'Dag'),
+ ('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_read', 'TaskInstance'),
diff --git a/airflow/api_connexion/endpoints/log_endpoint.py
b/airflow/api_connexion/endpoints/log_endpoint.py
index 4b48ed0..32d5396 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/endpoints/log_endpoint.py
@@ -23,11 +23,14 @@ from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.schemas.log_schema import LogResponseObject,
logs_schema
from airflow.models import DagRun
+from airflow.security import permissions
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import provide_session
[email protected]_access([('can_read', 'Dag'), ('can_read', 'DagRun'),
('can_read', 'Task')])
[email protected]_access(
+ [('can_read', permissions.RESOURCE_DAGS), ('can_read', 'DagRun'),
('can_read', 'Task')]
+)
@provide_session
def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
full_content=False, token=None):
"""
diff --git a/airflow/api_connexion/endpoints/task_endpoint.py
b/airflow/api_connexion/endpoints/task_endpoint.py
index 4626962..d818ebb 100644
--- a/airflow/api_connexion/endpoints/task_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_endpoint.py
@@ -21,9 +21,10 @@ from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.schemas.task_schema import TaskCollection,
task_collection_schema, task_schema
from airflow.exceptions import TaskNotFound
+from airflow.security import permissions
[email protected]_access([("can_read", "Dag"), ("can_read", "Task")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_read", "Task")])
def get_task(dag_id, task_id):
"""
Get simplified representation of a task.
@@ -39,7 +40,7 @@ def get_task(dag_id, task_id):
return task_schema.dump(task)
[email protected]_access([("can_read", "Dag"), ("can_read", "Task")])
[email protected]_access([("can_read", permissions.RESOURCE_DAGS),
("can_read", "Task")])
def get_tasks(dag_id):
"""
Get tasks for DAG
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index cd66bc3..b9ee25f 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -37,13 +37,14 @@ from airflow.api_connexion.schemas.task_instance_schema
import (
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import clear_task_instances, TaskInstance as
TI
from airflow.models import SlaMiss
+from airflow.security import permissions
from airflow.utils.state import State
from airflow.utils.session import provide_session
@security.requires_access(
[
- ("can_read", "Dag"),
+ ("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
]
@@ -101,7 +102,7 @@ def _apply_range_filter(query, key, value_range: Tuple[Any,
Any]):
)
@security.requires_access(
[
- ("can_read", "Dag"),
+ ("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
]
@@ -169,7 +170,9 @@ def get_task_instances(
)
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun"),
("can_read", "Task")])
[email protected]_access(
+ [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"),
("can_read", "Task")]
+)
@provide_session
def get_task_instances_batch(session=None):
"""
@@ -223,7 +226,9 @@ def get_task_instances_batch(session=None):
)
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun"),
("can_edit", "Task")])
[email protected]_access(
+ [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"),
("can_edit", "Task")]
+)
@provide_session
def post_clear_task_instances(dag_id: str, session=None):
"""
@@ -263,7 +268,9 @@ def post_clear_task_instances(dag_id: str, session=None):
)
[email protected]_access([("can_read", "Dag"), ("can_read", "DagRun"),
("can_edit", "Task")])
[email protected]_access(
+ [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"),
("can_edit", "Task")]
+)
@provide_session
def post_set_task_instances_state(dag_id, session):
"""Set a state of task instances."""
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 9d8e888..39dc503 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -31,11 +31,17 @@ from airflow.api_connexion.schemas.xcom_schema import (
xcom_collection_schema,
)
from airflow.models import DagRun as DR, XCom
+from airflow.security import permissions
from airflow.utils.session import provide_session
@security.requires_access(
- [("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"),
("can_read", "XCom")]
+ [
+ ("can_read", permissions.RESOURCE_DAGS),
+ ("can_read", "DagRun"),
+ ("can_read", "Task"),
+ ("can_read", "XCom"),
+ ]
)
@format_parameters({'limit': check_limit})
@provide_session
@@ -71,7 +77,12 @@ def get_xcom_entries(
@security.requires_access(
- [("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"),
("can_read", "XCom")]
+ [
+ ("can_read", permissions.RESOURCE_DAGS),
+ ("can_read", "DagRun"),
+ ("can_read", "Task"),
+ ("can_read", "XCom"),
+ ]
)
@provide_session
def get_xcom_entry(
diff --git a/airflow/api_connexion/security.py
b/airflow/api_connexion/security.py
index 80afe12..373a700 100644
--- a/airflow/api_connexion/security.py
+++ b/airflow/api_connexion/security.py
@@ -20,6 +20,7 @@ from typing import Callable, Optional, Sequence, Tuple,
TypeVar, cast
from flask import Response, current_app, g
+from airflow.security.permissions import RESOURCE_DAGS
from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name
@@ -38,7 +39,7 @@ def can_access_any_dags(action: str, dag_id: Optional[int] =
None) -> bool:
"""Checks if user has read or write access to some dags."""
appbuilder = current_app.appbuilder
if dag_id and dag_id != '~':
- return appbuilder.sm.has_access(action, dag_id)
+ return appbuilder.sm.has_access(action,
appbuilder.sm.prefixed_dag_id(dag_id))
user = g.user
if action == 'can_read':
@@ -54,7 +55,7 @@ def check_authorization(
return
appbuilder = current_app.appbuilder
for permission in permissions:
- if permission in (('can_read', 'Dag'), ('can_edit', 'Dag')):
+ if permission in (('can_read', RESOURCE_DAGS), ('can_edit',
RESOURCE_DAGS)):
can_access_all_dags = appbuilder.sm.has_access(*permission)
if can_access_all_dags:
continue
@@ -79,7 +80,6 @@ def requires_access(permissions: Optional[Sequence[Tuple[str,
str]]] = None) ->
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
-
check_authentication()
check_authorization(permissions, kwargs.get('dag_id'))
diff --git a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
new file mode 100644
index 0000000..23fd96a
--- /dev/null
+++ b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Prefix DAG permissions.
+
+Revision ID: 849da589634d
+Revises: 52d53670a240
+Create Date: 2020-10-01 17:25:10.006322
+
+"""
+
+from airflow.security import permissions
+from airflow.www.app import cached_app
+
+# revision identifiers, used by Alembic.
+revision = '849da589634d'
+down_revision = '52d53670a240'
+branch_labels = None
+depends_on = None
+
+
+def upgrade(): # noqa: D103
+ permissions = ['can_dag_read', 'can_dag_edit']
+ view_menus = cached_app().appbuilder.sm.get_all_view_menu()
+ convert_permissions(permissions, view_menus, upgrade_action,
upgrade_dag_id)
+
+
+def downgrade(): # noqa: D103
+ permissions = ['can_read', 'can_edit']
+ vms = cached_app().appbuilder.sm.get_all_view_menu()
+ view_menus = [
+ vm for vm in vms if (vm.name == permissions.RESOURCE_DAGS or
vm.name.startswith('DAG:'))
+ ]
+ convert_permissions(permissions, view_menus, downgrade_action,
downgrade_dag_id)
+
+
+def upgrade_dag_id(dag_id):
+ """Adds the 'DAG:' prefix to a DAG view if appropriate."""
+ if dag_id == 'all_dags':
+ return permissions.RESOURCE_DAGS
+ if dag_id.startswith("DAG:"):
+ return dag_id
+ return f"DAG:{dag_id}"
+
+
+def downgrade_dag_id(dag_id):
+ """Removes the 'DAG:' prefix from a DAG view name to return the DAG id."""
+ if dag_id == permissions.RESOURCE_DAGS:
+ return 'all_dags'
+ if dag_id.startswith("DAG:"):
+ return dag_id[len("DAG:"):]
+ return dag_id
+
+
+def upgrade_action(action):
+ """Converts the a DAG permission name from the old style to the new
style."""
+ if action == 'can_dag_read':
+ return 'can_read'
+ return 'can_edit'
+
+
+def downgrade_action(action):
+ """Converts the a DAG permission name from the old style to the new
style."""
+ if action == 'can_read':
+ return 'can_dag_read'
+ return 'can_dag_edit'
+
+
+def convert_permissions(permissions, view_menus, convert_action,
convert_dag_id):
+ """Creates new empty role in DB"""
+ appbuilder = cached_app().appbuilder # pylint: disable=no-member
+ roles = appbuilder.sm.get_all_roles()
+ views_to_remove = set()
+ for permission_name in permissions: # pylint:
disable=too-many-nested-blocks
+ for view_menu in view_menus:
+ view_name = view_menu.name
+ old_pvm = appbuilder.sm.find_permission_view_menu(permission_name,
view_name)
+ if not old_pvm:
+ continue
+
+ views_to_remove.add(view_name)
+ new_permission_name = convert_action(permission_name)
+ new_pvm =
appbuilder.sm.add_permission_view_menu(new_permission_name,
convert_dag_id(view_name))
+ for role in roles:
+ if appbuilder.sm.exist_permission_on_roles(view_name,
permission_name, [role.id]):
+ appbuilder.sm.add_permission_role(role, new_pvm)
+ appbuilder.sm.del_permission_role(role, old_pvm)
+ print(f"DELETING: {role.name} ---->
{view_name}.{permission_name}")
+ appbuilder.sm.del_permission_view_menu(permission_name, view_name)
+ print(f"DELETING: perm_view ---->
{view_name}.{permission_name}")
+ for view_name in views_to_remove:
+ if appbuilder.sm.find_view_menu(view_name):
+ appbuilder.sm.del_view_menu(view_name)
+ print(f"DELETING: view_menu ----> {view_name}")
+
+ if 'can_dag_read' in permissions:
+ for permission_name in permissions:
+ if appbuilder.sm.find_permission(permission_name):
+ appbuilder.sm.del_permission(permission_name)
+ print(f"DELETING: permission ----> {permission_name}")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 3553920..45269f3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -51,6 +51,7 @@ from airflow.models.dagcode import DagCode
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import Context, TaskInstance,
clear_task_instances
+from airflow.security import permissions
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dates import cron_presets, date_range as utils_date_range
@@ -176,7 +177,7 @@ class DAG(BaseDag, LoggingMixin):
that it is executed when the dag succeeds.
:type on_success_callback: callable
:param access_control: Specify optional DAG-level permissions, e.g.,
- "{'role1': {'can_dag_read'}, 'role2': {'can_dag_read',
'can_dag_edit'}}"
+ "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
:type access_control: dict
:param is_paused_upon_creation: Specifies if the dag is paused when
created for the first time.
If the dag exists already, this flag will be ignored. If this optional
parameter
@@ -332,7 +333,7 @@ class DAG(BaseDag, LoggingMixin):
self.on_failure_callback = on_failure_callback
self.doc_md = doc_md
- self._access_control = access_control
+ self._access_control =
DAG._upgrade_outdated_dag_access_control(access_control)
self.is_paused_upon_creation = is_paused_upon_creation
self.jinja_environment_kwargs = jinja_environment_kwargs
@@ -382,6 +383,32 @@ class DAG(BaseDag, LoggingMixin):
# /Context Manager ----------------------------------------------
+ @staticmethod
+ def _upgrade_outdated_dag_access_control(access_control=None):
+ """
+ Looks for outdated dag level permissions (can_dag_read and
can_dag_edit) in DAG
+ access_controls (for example, {'role1': {'can_dag_read'}, 'role2':
{'can_dag_read', 'can_dag_edit'}})
+ and replaces them with updated permissions (can_read and can_edit).
+ """
+ if not access_control:
+ return None
+ new_perm_mapping = {
+ permissions.DEPRECATED_ACTION_CAN_DAG_READ:
permissions.ACTION_CAN_READ,
+ permissions.DEPRECATED_ACTION_CAN_DAG_EDIT:
permissions.ACTION_CAN_EDIT,
+ }
+ updated_access_control = {}
+ for role, perms in access_control.items():
+ updated_access_control[role] = {new_perm_mapping.get(perm, perm)
for perm in perms}
+
+ if access_control != updated_access_control:
+ warnings.warn(
+ "The 'can_dag_read' and 'can_dag_edit' permissions are
deprecated. "
+ "Please use 'can_read' and 'can_edit', respectively.",
+ DeprecationWarning, stacklevel=3
+ )
+
+ return updated_access_control
+
def date_range(
self,
start_date: datetime,
@@ -651,7 +678,7 @@ class DAG(BaseDag, LoggingMixin):
@access_control.setter
def access_control(self, value):
- self._access_control = value
+ self._access_control = DAG._upgrade_outdated_dag_access_control(value)
@property
def description(self) -> Optional[str]:
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
new file mode 100644
index 0000000..94d2cc2
--- /dev/null
+++ b/airflow/security/permissions.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Resource Constants
+RESOURCE_DAGS = 'Dags'
+RESOURCE_DAG_PREFIX = 'DAG:'
+
+# Action Constants
+ACTION_CAN_CREATE = 'can_create'
+ACTION_CAN_READ = 'can_read'
+ACTION_CAN_EDIT = 'can_edit'
+ACTION_CAN_DELETE = 'can_delete'
+DEPRECATED_ACTION_CAN_DAG_READ = 'can_dag_read'
+DEPRECATED_ACTION_CAN_DAG_EDIT = 'can_dag_edit'
diff --git a/airflow/www/app.py b/airflow/www/app.py
index e6f3104..21d7d3b 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -58,6 +58,7 @@ def sync_appbuilder_roles(flask_app):
if conf.getboolean('webserver', 'UPDATE_FAB_PERMS'):
security_manager = flask_app.appbuilder.sm
security_manager.sync_roles()
+ security_manager.sync_resource_permissions()
def create_app(config=None, testing=False, app_name="Airflow"):
diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py
index d73967a..4ee0af9 100644
--- a/airflow/www/decorators.py
+++ b/airflow/www/decorators.py
@@ -34,6 +34,7 @@ def action_logging(f: T) -> T:
"""
Decorator to log user actions
"""
+
@functools.wraps(f)
def wrapper(*args, **kwargs):
@@ -50,11 +51,11 @@ def action_logging(f: T) -> T:
owner=user,
extra=str([(k, v) for k, v in request.values.items() if k not
in fields_skip_logging]),
task_id=request.values.get('task_id'),
- dag_id=request.values.get('dag_id'))
+ dag_id=request.values.get('dag_id'),
+ )
if 'execution_date' in request.values:
- log.execution_date = pendulum.parse(
- request.values.get('execution_date'), strict=False)
+ log.execution_date =
pendulum.parse(request.values.get('execution_date'), strict=False)
session.add(log)
@@ -67,6 +68,7 @@ def gzipped(f: T) -> T:
"""
Decorator to make a view compressed
"""
+
@functools.wraps(f)
def view_func(*args, **kwargs):
@after_this_request
@@ -78,12 +80,14 @@ def gzipped(f: T) -> T:
response.direct_passthrough = False
- if (response.status_code < 200 or response.status_code >= 300 or
- 'Content-Encoding' in response.headers):
+ if (
+ response.status_code < 200
+ or response.status_code >= 300
+ or 'Content-Encoding' in response.headers
+ ):
return response
gzip_buffer = IO()
- gzip_file = gzip.GzipFile(mode='wb',
- fileobj=gzip_buffer)
+ gzip_file = gzip.GzipFile(mode='wb', fileobj=gzip_buffer)
gzip_file.write(response.data)
gzip_file.close()
@@ -103,30 +107,22 @@ def has_dag_access(**dag_kwargs) -> Callable[[T], T]:
"""
Decorator to check whether the user has read / write permission on the dag.
"""
+
def decorator(f: T):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
- has_access = self.appbuilder.sm.has_access
dag_id = request.values.get('dag_id')
- # if it is false, we need to check whether user has write access
on the dag
- can_dag_edit = dag_kwargs.get('can_dag_edit', False)
-
- # 1. check whether the user has can_dag_edit permissions on
all_dags
- # 2. if 1 false, check whether the user
- # has can_dag_edit permissions on the dag
- # 3. if 2 false, check whether it is can_dag_read view,
- # and whether user has the permissions
- if (
- has_access('can_dag_edit', 'all_dags') or
- has_access('can_dag_edit', dag_id) or (not can_dag_edit and
-
(has_access('can_dag_read',
- 'all_dags')
or
-
has_access('can_dag_read',
- dag_id)))):
- return f(self, *args, **kwargs)
+ needs_edit_access = dag_kwargs.get('can_dag_edit', False)
+
+ if needs_edit_access:
+ if self.appbuilder.sm.can_edit_dag(dag_id):
+ return f(self, *args, **kwargs)
else:
- flash("Access is Denied", "danger")
- return redirect(url_for(self.appbuilder.sm.auth_view.
- __class__.__name__ + ".login"))
+ if self.appbuilder.sm.can_read_dag(dag_id):
+ return f(self, *args, **kwargs)
+ flash("Access is Denied", "danger")
+ return
redirect(url_for(self.appbuilder.sm.auth_view.__class__.__name__ + ".login"))
+
return cast(T, wrapper)
+
return decorator
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 20686b7..9aa543e 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -29,6 +29,7 @@ from sqlalchemy.orm import joinedload
from airflow import models
from airflow.exceptions import AirflowException
from airflow.models import DagModel
+from airflow.security import permissions
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.www.utils import CustomSQLAInterface
@@ -41,13 +42,6 @@ EXISTING_ROLES = {
'Public',
}
-CAN_CREATE = 'can_create'
-CAN_READ = 'can_read'
-CAN_DAG_READ = 'can_dag_read'
-CAN_EDIT = 'can_edit'
-CAN_DAG_EDIT = 'can_dag_edit'
-CAN_DELETE = 'can_delete'
-
class AirflowSecurityManager(SecurityManager, LoggingMixin):
"""Custom security manager, which introduces an permission model adapted
to Airflow"""
@@ -160,19 +154,10 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
# [END security_op_perms]
# global view-menu for dag-level access
- DAG_VMS = {'all_dags'}
+ DAG_VMS = {permissions.RESOURCE_DAGS}
- WRITE_DAG_PERMS = {
- 'can_dag_edit',
- 'can_edit',
- }
-
- READ_DAG_PERMS = {
- 'can_dag_read',
- 'can_read',
- }
-
- DAG_PERMS = WRITE_DAG_PERMS | READ_DAG_PERMS
+ READ_DAG_PERMS = {permissions.ACTION_CAN_READ}
+ DAG_PERMS = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
###########################################################################
# DEFAULT ROLE CONFIGURATIONS
@@ -280,11 +265,11 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
def get_readable_dags(self, user):
"""Gets the DAGs readable by authenticated user."""
- return self.get_accessible_dags([CAN_READ, CAN_DAG_READ], user)
+ return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
def get_editable_dags(self, user):
"""Gets the DAGs editable by authenticated user."""
- return self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT], user)
+ return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
def get_readable_dag_ids(self, user) -> Set[str]:
"""Gets the DAG IDs readable by authenticated user."""
@@ -296,7 +281,9 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def get_accessible_dag_ids(self, user) -> Set[str]:
"""Gets the DAG IDs editable or readable by authenticated user."""
- accessible_dags = self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT,
CAN_READ, CAN_DAG_READ], user)
+ accessible_dags = self.get_accessible_dags(
+ [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ], user
+ )
return set(dag.dag_id for dag in accessible_dags)
@provide_session
@@ -320,33 +307,83 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
for permission in role.permissions:
resource = permission.view_menu.name
action = permission.permission.name
- if action in user_actions:
+ if action not in user_actions:
+ continue
+
+ if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
+
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
+ else:
resources.add(resource)
- if bool({'Dag', 'all_dags'}.intersection(resources)):
+ if permissions.RESOURCE_DAGS in resources:
return session.query(DagModel)
return session.query(DagModel).filter(DagModel.dag_id.in_(resources))
- def has_access(self, permission, view_name, user=None) -> bool:
+ def can_read_dag(self, dag_id, user=None) -> bool:
+ """Determines whether a user has DAG read access."""
+ if not user:
+ user = g.user
+ prefixed_dag_id = self.prefixed_dag_id(dag_id)
+ return self._has_view_access(
+ user, permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS
+ ) or self._has_view_access(user, permissions.ACTION_CAN_READ,
prefixed_dag_id)
+
+ def can_edit_dag(self, dag_id, user=None) -> bool:
+ """Determines whether a user has DAG edit access."""
+ if not user:
+ user = g.user
+ prefixed_dag_id = self.prefixed_dag_id(dag_id)
+
+ return self._has_view_access(
+ user, permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAGS
+ ) or self._has_view_access(user, permissions.ACTION_CAN_EDIT,
prefixed_dag_id)
+
+ def prefixed_dag_id(self, dag_id):
+ """Returns the permission name for a DAG id."""
+ if dag_id == permissions.RESOURCE_DAGS:
+ return dag_id
+
+ if dag_id.startswith(permissions.RESOURCE_DAG_PREFIX):
+ return dag_id
+ return f"{permissions.RESOURCE_DAG_PREFIX}{dag_id}"
+
+ def is_dag_resource(self, resource_name):
+ """Determines if a permission belongs to a DAG or all DAGs."""
+ if resource_name == permissions.RESOURCE_DAGS:
+ return True
+ return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
+
+ def has_access(self, permission, resource, user=None) -> bool:
"""
Verify whether a given user could perform certain permission
- (e.g can_read, can_write) on the given dag_id.
+ (e.g can_read, can_write) on the given resource.
- :param permission: permission on dag_id(e.g can_read, can_edit).
+ :param permission: permission on resource (e.g can_read, can_edit).
:type permission: str
- :param view_name: name of view-menu(e.g dag id is a view-menu as well).
- :type view_name: str
+ :param resource: name of view-menu or resource.
+ :type resource: str
:param user: user name
:type user: str
- :return: a bool whether user could perform certain permission on the
dag_id.
+ :return: a bool whether user could perform certain permission on the
resource.
:rtype bool
"""
if not user:
user = g.user
+ # breakpoint()
if user.is_anonymous:
- return self.is_item_public(permission, view_name)
- return self._has_view_access(user, permission, view_name)
+ return self.is_item_public(permission, resource)
+
+ has_access = self._has_view_access(user, permission, resource)
+ # FAB built-in view access method. Won't work for AllDag access.
+
+ if self.is_dag_resource(resource):
+ if permission == permissions.ACTION_CAN_READ:
+ has_access |= self.can_read_dag(resource, user)
+ elif permission == permissions.ACTION_CAN_EDIT:
+ has_access |= self.can_edit_dag(resource, user)
+
+ return has_access
def _get_and_cache_perms(self):
"""
@@ -377,13 +414,13 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
"""
Has all the dag access in any of the 3 cases:
1. Role needs to be in (Admin, Viewer, User, Op).
- 2. Has can_dag_read permission on all_dags view.
- 3. Has can_dag_edit permission on all_dags view.
+ 2. Has can_read permission on dags view.
+ 3. Has can_edit permission on dags view.
"""
return (
self._has_role(['Admin', 'Viewer', 'Op', 'User'])
- or self._has_perm('can_dag_read', 'all_dags')
- or self._has_perm('can_dag_edit', 'all_dags')
+ or self._has_perm(permissions.ACTION_CAN_READ,
permissions.RESOURCE_DAGS)
+ or self._has_perm(permissions.ACTION_CAN_EDIT,
permissions.RESOURCE_DAGS)
)
def clean_perms(self):
@@ -467,10 +504,10 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
.all()
)
- # create can_dag_edit and can_dag_read permissions for every dag(vm)
+ # create can_edit and can_read permissions for every dag(vm)
for dag in all_dags_models:
for perm in self.DAG_PERMS:
- merge_pv(perm, dag.dag_id)
+ merge_pv(perm, self.prefixed_dag_id(dag.dag_id))
# for all the dag-level role, add the permission of viewer
# with the dag view to ab_permission_view
@@ -481,7 +518,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
update_perm_views = []
# need to remove all_dag vm from all the existing view-menus
- dag_vm = self.find_view_menu('all_dags')
+ dag_vm = self.find_view_menu(permissions.RESOURCE_DAGS)
ab_perm_view_role = sqla_models.assoc_permissionview_role
perm_view = self.permissionview_model
view_menu = self.viewmenu_model
@@ -501,6 +538,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
for role in dag_role:
# pylint: disable=no-member
# Get all the perm-view of the role
+
existing_perm_view_by_user =
self.get_session.query(ab_perm_view_role).filter(
ab_perm_view_role.columns.role_id == role.id
)
@@ -520,18 +558,23 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
def update_admin_perm_view(self):
"""
Admin should has all the permission-views, except the dag views.
- because Admin have already have all_dags permission.
+ because Admin have already have Dag permission.
Add the missing ones to the table for admin.
:return: None.
"""
- all_dag_view = self.find_view_menu('all_dags')
- dag_perm_ids = [self.find_permission('can_dag_edit').id,
self.find_permission('can_dag_read').id]
+ all_dag_view = self.find_view_menu(permissions.RESOURCE_DAGS)
+ dag_pvs = (
+ self.get_session.query(sqla_models.ViewMenu)
+
.filter(sqla_models.ViewMenu.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+ .all()
+ )
+ pv_ids = [pv.id for pv in dag_pvs]
pvms = (
self.get_session.query(sqla_models.PermissionView)
.filter(
~and_(
- sqla_models.PermissionView.permission_id.in_(dag_perm_ids),
+ sqla_models.PermissionView.view_menu_id.in_(pv_ids),
sqla_models.PermissionView.view_menu_id != all_dag_view.id,
)
)
@@ -553,7 +596,6 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
:return: None.
"""
- self.log.debug('Start syncing user roles.')
# Create global all-dag VM
self.create_perm_vm_for_all_dag()
@@ -564,19 +606,18 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
perms = config['perms']
self.init_role(role, vms, perms)
self.create_custom_dag_permission_view()
-
# init existing roles, the rest role could be created through UI.
self.update_admin_perm_view()
self.clean_perms()
- def sync_resource_permissions(self, permissions=None):
+ def sync_resource_permissions(self, perms=None):
"""
Populates resource-based permissions.
"""
- if not permissions:
+ if not perms:
return
- for action, resource in permissions:
+ for action, resource in perms:
self.add_view_menu(resource)
self.add_permission_view_menu(action, resource)
@@ -589,17 +630,18 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
- {'can_dag_read'}
+ {'can_read'}
:type access_control: dict
:return:
"""
+ prefixed_dag_id = self.prefixed_dag_id(dag_id)
for dag_perm in self.DAG_PERMS:
- perm_on_dag = self.find_permission_view_menu(dag_perm, dag_id)
+ perm_on_dag = self.find_permission_view_menu(dag_perm,
prefixed_dag_id)
if perm_on_dag is None:
- self.add_permission_view_menu(dag_perm, dag_id)
+ self.add_permission_view_menu(dag_perm, prefixed_dag_id)
if access_control:
- self._sync_dag_view_permissions(dag_id, access_control)
+ self._sync_dag_view_permissions(prefixed_dag_id, access_control)
def _sync_dag_view_permissions(self, dag_id, access_control):
"""Set the access policy on the given DAG's ViewModel.
@@ -608,15 +650,16 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
:type dag_id: str
:param access_control: a dict where each key is a rolename and
each value is a set() of permission names (e.g.,
- {'can_dag_read'}
+ {'can_read'}
:type access_control: dict
"""
+ prefixed_dag_id = self.prefixed_dag_id(dag_id)
def _get_or_create_dag_permission(perm_name):
- dag_perm = self.find_permission_view_menu(perm_name, dag_id)
+ dag_perm = self.find_permission_view_menu(perm_name,
prefixed_dag_id)
if not dag_perm:
- self.log.info("Creating new permission '%s' on view '%s'",
perm_name, dag_id)
- dag_perm = self.add_permission_view_menu(perm_name, dag_id)
+ self.log.info("Creating new permission '%s' on view '%s'",
perm_name, prefixed_dag_id)
+ dag_perm = self.add_permission_view_menu(perm_name,
prefixed_dag_id)
return dag_perm
@@ -628,11 +671,14 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
target_perms_for_role = access_control.get(role.name, {})
if perm.permission.name not in target_perms_for_role:
self.log.info(
- "Revoking '%s' on DAG '%s' for role '%s'",
perm.permission, dag_id, role.name
+ "Revoking '%s' on DAG '%s' for role '%s'",
+ perm.permission,
+ prefixed_dag_id,
+ role.name,
)
self.del_permission_role(role, perm)
- dag_view = self.find_view_menu(dag_id)
+ dag_view = self.find_view_menu(prefixed_dag_id)
if dag_view:
_revoke_stale_permissions(dag_view)
@@ -641,7 +687,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
if not role:
raise AirflowException(
"The access_control mapping for DAG '{}' includes a role "
- "named '{}', but that role does not exist".format(dag_id,
rolename)
+ "named '{}', but that role does not
exist".format(prefixed_dag_id, rolename)
)
perms = set(perms)
@@ -650,7 +696,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
raise AirflowException(
"The access_control map for DAG '{}' includes the
following "
"invalid permissions: {}; The set of valid permissions "
- "is: {}".format(dag_id, (perms - self.DAG_PERMS),
self.DAG_PERMS)
+ "is: {}".format(prefixed_dag_id, (perms - self.DAG_PERMS),
self.DAG_PERMS)
)
for perm_name in perms:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 342add0..345d705 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -63,6 +63,7 @@ from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
+from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS,
SCHEDULER_QUEUED_DEPS
from airflow.utils import timezone
@@ -461,7 +462,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
if arg_tags_filter:
dags_query =
dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
- if 'all_dags' not in filter_dag_ids:
+ if permissions.RESOURCE_DAGS not in filter_dag_ids:
dags_query =
dags_query.filter(DagModel.dag_id.in_(filter_dag_ids))
# pylint: enable=no-member
@@ -545,7 +546,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
dr = models.DagRun
allowed_dag_ids =
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
- if 'all_dags' in allowed_dag_ids:
+ if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in
session.query(models.DagModel.dag_id)]
dag_state_stats = session.query(dr.dag_id, dr.state,
sqla.func.count(dr.state))\
@@ -594,7 +595,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
if not allowed_dag_ids:
return wwwutils.json_response({})
- if 'all_dags' in allowed_dag_ids:
+ if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = {dag_id for dag_id, in
session.query(models.DagModel.dag_id)}
# Filter by post parameters
@@ -705,7 +706,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
"""Last DAG runs"""
allowed_dag_ids =
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
- if 'all_dags' in allowed_dag_ids:
+ if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in
session.query(models.DagModel.dag_id)]
# Filter by post parameters
@@ -1389,7 +1390,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
"""Mark Dag Blocked."""
allowed_dag_ids =
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
- if 'all_dags' in allowed_dag_ids:
+ if permissions.RESOURCE_DAGS in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in
session.query(models.DagModel.dag_id)]
# Filter by post parameters
@@ -3142,7 +3143,7 @@ class DagModelView(AirflowModelView):
filter_dag_ids =
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
# pylint: disable=no-member
- if not bool({'all_dags', 'Dag'}.intersection(filter_dag_ids)):
+ if permissions.RESOURCE_DAGS not in filter_dag_ids:
dag_ids_query =
dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids))
owners_query =
owners_query.filter(DagModel.dag_id.in_(filter_dag_ids))
# pylint: enable=no-member
diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py
b/tests/api_connexion/endpoints/test_dag_endpoint.py
index ee75dbc..692159e 100644
--- a/tests/api_connexion/endpoints/test_dag_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_endpoint.py
@@ -25,6 +25,7 @@ from airflow.api_connexion.exceptions import
EXCEPTIONS_LINK_MAP
from airflow.models import DagBag, DagModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.www import app
from tests.test_utils.api_connexion_utils import assert_401, create_user,
delete_user
@@ -52,10 +53,10 @@ class TestDagEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
- ("can_create", "Dag"),
- ("can_read", "Dag"),
- ("can_edit", "Dag"),
- ("can_delete", "Dag"),
+ ("can_create", permissions.RESOURCE_DAGS),
+ ("can_read", permissions.RESOURCE_DAGS),
+ ("can_edit", permissions.RESOURCE_DAGS),
+ ("can_delete", permissions.RESOURCE_DAGS),
],
)
create_user(cls.app, username="test_no_permissions",
role_name="TestNoPermissions") # type: ignore
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 2b4c263..8008bee 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -21,6 +21,7 @@ from parameterized import parameterized
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DagModel, DagRun
+from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.types import DagRunType
@@ -42,7 +43,7 @@ class TestDagRunEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
- ("can_read", "Dag"),
+ ("can_read", permissions.RESOURCE_DAGS),
("can_create", "DagRun"),
("can_read", "DagRun"),
("can_edit", "DagRun"),
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index afd73cd..4fef5bf 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -27,6 +27,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import
BigQueryExecuteQueryOperator
+from airflow.security import permissions
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from airflow.utils.timezone import datetime
@@ -51,7 +52,7 @@ class TestGetExtraLinks(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
- ('can_read', 'Dag'),
+ ('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_read', 'TaskInstance'),
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py
b/tests/api_connexion/endpoints/test_log_endpoint.py
index fffcf8e..5ece059 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -31,6 +31,7 @@ from airflow.api_connexion.exceptions import
EXCEPTIONS_LINK_MAP
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.models import DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.types import DagRunType
@@ -56,7 +57,11 @@ class TestGetLog(unittest.TestCase):
cls.app,
username="test",
role_name="Test",
- permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'),
('can_read', 'Task')],
+ permissions=[
+ ('can_read', permissions.RESOURCE_DAGS),
+ ('can_read', 'DagRun'),
+ ('can_read', 'Task'),
+ ],
)
create_user(cls.app, username="test_no_permissions",
role_name="TestNoPermissions")
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py
b/tests/api_connexion/endpoints/test_task_endpoint.py
index f599055..939d587 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -22,6 +22,7 @@ from airflow import DAG
from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
from airflow.www import app
from tests.test_utils.api_connexion_utils import assert_401, create_user,
delete_user
from tests.test_utils.config import conf_vars
@@ -47,7 +48,11 @@ class TestTaskEndpoint(unittest.TestCase):
cls.app, # type: ignore
username="test",
role_name="Test",
- permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'),
('can_read', 'Task')],
+ permissions=[
+ ('can_read', permissions.RESOURCE_DAGS),
+ ('can_read', 'DagRun'),
+ ('can_read', 'Task'),
+ ],
)
create_user(cls.app, username="test_no_permissions",
role_name="TestNoPermissions") # type: ignore
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 06b33dc..9413920 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -21,6 +21,7 @@ from unittest import mock
from parameterized import parameterized
from airflow.models import DagBag, DagRun, TaskInstance, SlaMiss
+from airflow.security import permissions
from airflow.utils.types import DagRunType
from airflow.utils.session import provide_session
from airflow.utils.state import State
@@ -46,7 +47,7 @@ class TestTaskInstanceEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
- ('can_read', 'Dag'),
+ ('can_read', permissions.RESOURCE_DAGS),
('can_read', 'DagRun'),
('can_read', 'Task'),
('can_edit', 'Task'),
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 368a0e0..b8fc5f7 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -19,6 +19,7 @@ import unittest
from parameterized import parameterized
from airflow.models import DagModel, DagRun as DR, XCom
+from airflow.security import permissions
from airflow.utils.dates import parse_execution_date
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
@@ -40,7 +41,7 @@ class TestXComEndpoint(unittest.TestCase):
username="test",
role_name="Test",
permissions=[
- ("can_read", "Dag"),
+ ("can_read", permissions.RESOURCE_DAGS),
("can_read", "DagRun"),
("can_read", "Task"),
("can_read", "XCom"),
diff --git a/tests/cli/commands/test_sync_perm_command.py
b/tests/cli/commands/test_sync_perm_command.py
index ce87023..8d2f117 100644
--- a/tests/cli/commands/test_sync_perm_command.py
+++ b/tests/cli/commands/test_sync_perm_command.py
@@ -38,7 +38,7 @@ class TestCliSyncPerm(unittest.TestCase):
self.expect_dagbag_contains([
DAG('has_access_control',
access_control={
- 'Public': {'can_dag_read'}
+ 'Public': {'can_read'}
}),
DAG('no_access_control')
], dagbag_mock)
@@ -56,7 +56,7 @@ class TestCliSyncPerm(unittest.TestCase):
self.assertEqual(2, len(appbuilder.sm.sync_perm_for_dag.mock_calls))
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'has_access_control',
- {'Public': {'can_dag_read'}}
+ {'Public': {'can_read'}}
)
appbuilder.sm.sync_perm_for_dag.assert_any_call(
'no_access_control',
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 8a11cf7..3280922 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -31,6 +31,7 @@ from unittest import mock
from unittest.mock import patch
import pendulum
+import pytest
from dateutil.relativedelta import relativedelta
from freezegun import freeze_time
from parameterized import parameterized
@@ -1653,6 +1654,18 @@ class TestDag(unittest.TestCase):
next_subdag_date = subdag.next_dagrun_after_date(None)
assert next_subdag_date is None, "SubDags should never have DagRuns
created by the scheduler"
+ def test_replace_outdated_access_control_actions(self):
+ outdated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2':
{'can_dag_read', 'can_dag_edit'}}
+ updated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2':
{'can_read', 'can_edit'}}
+
+ with pytest.warns(DeprecationWarning):
+ dag = DAG(dag_id='dag_with_outdated_perms',
access_control=outdated_permissions)
+ self.assertEqual(dag.access_control, updated_permissions)
+
+ with pytest.warns(DeprecationWarning):
+ dag.access_control = outdated_permissions
+ self.assertEqual(dag.access_control, updated_permissions)
+
class TestDagModel:
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 93d1110..ed85ded 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -132,8 +132,8 @@ serialized_simple_dag_ground_truth = {
"test_role": {
"__type": "set",
"__var": [
- "can_dag_read",
- "can_dag_edit"
+ "can_read",
+ "can_edit"
]
}
}
@@ -164,7 +164,7 @@ def make_simple_dag():
start_date=datetime(2019, 8, 1),
is_paused_upon_creation=False,
access_control={
- "test_role": {"can_dag_read", "can_dag_edit"}
+ "test_role": {"can_read", "can_edit"}
}
) as dag:
CustomOperator(task_id='custom_task')
diff --git a/tests/test_utils/fab_utils.py b/tests/test_utils/fab_utils.py
new file mode 100644
index 0000000..27c7f67
--- /dev/null
+++ b/tests/test_utils/fab_utils.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
+from airflow.www import security
+
+
+def create_user(app, username, role_name, permissions=None):
+ appbuilder = app.appbuilder
+
+ # Removes user and role so each test has isolated test data.
+ delete_user(app, username)
+ delete_role(app, role_name)
+ role = create_role(app, role_name, permissions)
+
+ return appbuilder.sm.add_user(
+ username=username,
+ first_name=username,
+ last_name=username,
+ email=f"{username}@fab.org",
+ role=role,
+ password=username,
+ )
+
+
+def create_role(app, name, permissions=None):
+ appbuilder = app.appbuilder
+ role = appbuilder.sm.find_role(name)
+ if not role:
+ role = appbuilder.sm.add_role(name)
+ if not permissions:
+ permissions = []
+ for permission in permissions:
+ perm_object = appbuilder.sm.find_permission_view_menu(*permission)
+ appbuilder.sm.add_permission_role(role, perm_object)
+ return role
+
+
+def delete_role(app, name):
+ if app.appbuilder.sm.find_role(name):
+ app.appbuilder.sm.delete_role(name)
+
+
+def delete_roles(app):
+ for role in app.appbuilder.sm.get_all_roles():
+ if role.name not in security.EXISTING_ROLES:
+ app.appbuilder.sm.delete_role(role.name)
+
+
+def delete_user(app, username):
+ appbuilder = app.appbuilder
+ for user in appbuilder.sm.get_all_users():
+ if user.username == username:
+ for role in user.roles:
+ delete_role(app, role.name)
+ appbuilder.sm.del_register_user(user)
+ break
+
+
+def assert_401(response):
+ assert response.status_code == 401
+ assert response.json == {
+ 'detail': None,
+ 'status': 401,
+ 'title': 'Unauthorized',
+ 'type': EXCEPTIONS_LINK_MAP[401],
+ }
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index fc7f57a..f3529b6 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -28,13 +28,15 @@ from sqlalchemy import Column, Date, Float, Integer, String
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.models import DagModel
+from airflow.security import permissions
from airflow.www import app as application
from airflow.www.utils import CustomSQLAInterface
-from tests.test_utils.db import clear_db_runs
+from tests.test_utils import fab_utils
+from tests.test_utils.db import clear_db_dags, clear_db_runs
from tests.test_utils.mock_security_manager import MockSecurityManager
-READ_WRITE = {'can_dag_read', 'can_dag_edit'}
-READ_ONLY = {'can_dag_read'}
+READ_WRITE = {'can_read', 'can_edit'}
+READ_ONLY = {'can_read'}
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(name)s:%(message)s')
logging.getLogger().setLevel(logging.DEBUG)
@@ -76,18 +78,22 @@ class TestSecurity(unittest.TestCase):
cls.appbuilder = cls.app.appbuilder # pylint: disable=no-member
cls.app.config['WTF_CSRF_ENABLED'] = False
cls.security_manager = cls.appbuilder.sm
- cls.role_admin = cls.security_manager.find_role('Admin')
- cls.user = cls.appbuilder.sm.add_user(
- 'admin', 'admin', 'user', '[email protected]', cls.role_admin,
'general'
- )
+ cls.delete_roles()
def setUp(self):
+ clear_db_runs()
+ clear_db_dags()
self.db = SQLA(self.app)
self.appbuilder.add_view(SomeBaseView, "SomeBaseView",
category="BaseViews")
self.appbuilder.add_view(SomeModelView, "SomeModelView",
category="ModelViews")
log.debug("Complete setup!")
+ @classmethod
+ def delete_roles(cls):
+ for role_name in ['team-a', 'MyRole1', 'MyRole5', 'Test_Role',
'MyRole3', 'MyRole2']:
+ fab_utils.delete_role(cls.app, role_name)
+
def expect_user_is_in_role(self, user, rolename):
self.security_manager.init_role(rolename, [], [])
role = self.security_manager.find_role(rolename)
@@ -97,26 +103,28 @@ class TestSecurity(unittest.TestCase):
user.roles = [role]
self.security_manager.update_user(user)
- def assert_user_has_dag_perms(self, perms, dag_id):
+ def assert_user_has_dag_perms(self, perms, dag_id, user=None):
for perm in perms:
self.assertTrue(
- self._has_dag_perm(perm, dag_id),
- "User should have '{}' on DAG '{}'".format(perm, dag_id))
+ self._has_dag_perm(perm, dag_id, user),
+ "User should have '{}' on DAG '{}'".format(perm, dag_id),
+ )
- def assert_user_does_not_have_dag_perms(self, dag_id, perms):
+ def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
for perm in perms:
self.assertFalse(
- self._has_dag_perm(perm, dag_id),
- "User should not have '{}' on DAG '{}'".format(perm, dag_id))
+ self._has_dag_perm(perm, dag_id, user),
+ "User should not have '{}' on DAG '{}'".format(perm, dag_id),
+ )
- def _has_dag_perm(self, perm, dag_id):
- return self.security_manager.has_access(
- perm,
- dag_id,
- self.user)
+ def _has_dag_perm(self, perm, dag_id, user):
+ # if not user:
+ # user = self.user
+ return self.security_manager.has_access(perm,
self.security_manager.prefixed_dag_id(dag_id), user)
def tearDown(self):
clear_db_runs()
+ clear_db_dags()
self.appbuilder = None
self.app = None
self.db = None
@@ -145,8 +153,7 @@ class TestSecurity(unittest.TestCase):
self.security_manager.init_role(role_name, [], [])
role = self.security_manager.find_role(role_name)
- perm = self.security_manager.\
- find_permission_view_menu('can_edit', 'RoleModelView')
+ perm = self.security_manager.find_permission_view_menu('can_edit',
'RoleModelView')
self.security_manager.add_permission_role(role, perm)
role_perms_len = len(role.permissions)
@@ -165,43 +172,45 @@ class TestSecurity(unittest.TestCase):
@mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
def test_get_all_permissions_views(self, mock_get_user_roles):
role_name = 'MyRole5'
- role_perms = ['can_some_action']
- role_vms = ['SomeBaseView']
- self.security_manager.init_role(role_name, role_vms, role_perms)
- role = self.security_manager.find_role(role_name)
+ role_perm = 'can_some_action'
+ role_vm = 'SomeBaseView'
+ username = 'get_all_permissions_views'
+
+ with self.app.app_context():
+ user = fab_utils.create_user(self.app, username, role_name,
permissions=[(role_perm, role_vm),],)
+ role = user.roles[0]
+ mock_get_user_roles.return_value = [role]
- mock_get_user_roles.return_value = [role]
- self.assertEqual(self.security_manager
- .get_all_permissions_views(),
- {('can_some_action', 'SomeBaseView')})
+
self.assertEqual(self.security_manager.get_all_permissions_views(),
{(role_perm, role_vm)})
- mock_get_user_roles.return_value = []
- self.assertEqual(len(self.security_manager
- .get_all_permissions_views()), 0)
+ mock_get_user_roles.return_value = []
+
self.assertEqual(len(self.security_manager.get_all_permissions_views()), 0)
def test_get_accessible_dag_ids(self):
role_name = 'MyRole1'
- permission_action = ['can_dag_read']
+ permission_action = ['can_read']
dag_id = 'dag_id'
- username = "Mr. User"
- self.security_manager.init_role(role_name, [], [])
- self.security_manager.sync_perm_for_dag( # type: ignore # pylint:
disable=no-member
- dag_id, access_control={role_name: permission_action}
- )
- role = self.security_manager.find_role(role_name)
- user = self.security_manager.add_user(
- username=username,
- first_name=username,
- last_name=username,
- email=f"{username}@fab.org",
- role=role,
- password=username,
+ username = "ElUser"
+
+ user = fab_utils.create_user(
+ self.app,
+ username,
+ role_name,
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+ ],
)
- dag_model = DagModel(dag_id="dag_id", fileloc="/tmp/dag_.py",
schedule_interval="2 2 * * *")
+
+ dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py",
schedule_interval="2 2 * * *")
self.session.add(dag_model)
self.session.commit()
- self.assertEqual(self.security_manager
- .get_accessible_dag_ids(user), {'dag_id'})
+
+ self.security_manager.sync_perm_for_dag( # type: ignore # pylint:
disable=no-member
+ dag_id, access_control={role_name: permission_action}
+ )
+
+ self.assertEqual(self.security_manager.get_accessible_dag_ids(user),
{'dag_id'})
@mock.patch('airflow.www.security.AirflowSecurityManager._has_view_access')
def test_has_access(self, mock_has_view_access):
@@ -212,10 +221,14 @@ class TestSecurity(unittest.TestCase):
def test_sync_perm_for_dag_creates_permissions_on_view_menus(self):
test_dag_id = 'TEST_DAG'
+ prefixed_test_dag_id = f'DAG:{test_dag_id}'
self.security_manager.sync_perm_for_dag(test_dag_id,
access_control=None)
- for dag_perm in self.security_manager.DAG_PERMS:
- self.assertIsNotNone(self.security_manager.
- find_permission_view_menu(dag_perm,
test_dag_id))
+ self.assertIsNotNone(
+ self.security_manager.find_permission_view_menu('can_read',
prefixed_test_dag_id)
+ )
+ self.assertIsNotNone(
+ self.security_manager.find_permission_view_menu('can_edit',
prefixed_test_dag_id)
+ )
@mock.patch('airflow.www.security.AirflowSecurityManager._has_perm')
@mock.patch('airflow.www.security.AirflowSecurityManager._has_role')
@@ -234,74 +247,79 @@ class TestSecurity(unittest.TestCase):
with self.assertRaises(AirflowException) as context:
self.security_manager.sync_perm_for_dag(
dag_id='access-control-test',
- access_control={
- 'this-role-does-not-exist': ['can_dag_edit',
'can_dag_read']
- })
+ access_control={'this-role-does-not-exist': ['can_edit',
'can_read']},
+ )
self.assertIn("role does not exist", str(context.exception))
+ def test_all_dag_access_doesnt_give_non_dag_access(self):
+ username = 'dag_access_user'
+ role_name = 'dag_access_role'
+ with self.app.app_context():
+ user = fab_utils.create_user(
+ self.app,
+ username,
+ role_name,
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+ ],
+ )
+ self.assertTrue(
+ self.security_manager.has_access(permissions.ACTION_CAN_READ,
permissions.RESOURCE_DAGS, user)
+ )
+
self.assertFalse(self.security_manager.has_access(permissions.ACTION_CAN_READ,
'Task', user))
+
def test_access_control_with_invalid_permission(self):
invalid_permissions = [
'can_varimport', # a real permission, but not a member of
DAG_PERMS
'can_eat_pudding', # clearly not a real permission
]
- username = "Mrs. User"
- user = self.security_manager.add_user(
- username=username,
- first_name=username,
- last_name=username,
- email=f"{username}@fab.org",
- role=self.role_admin,
- password=username,
- )
+ username = "LaUser"
+ user = fab_utils.create_user(self.app, username=username,
role_name='team-a',)
for permission in invalid_permissions:
self.expect_user_is_in_role(user, rolename='team-a')
with self.assertRaises(AirflowException) as context:
self.security_manager.sync_perm_for_dag(
- 'access_control_test',
- access_control={
- 'team-a': {permission}
- })
+ 'access_control_test', access_control={'team-a':
{permission}}
+ )
self.assertIn("invalid permissions", str(context.exception))
def test_access_control_is_set_on_init(self):
- self.expect_user_is_in_role(self.user, rolename='team-a')
- self.security_manager.sync_perm_for_dag(
- 'access_control_test',
- access_control={
- 'team-a': ['can_dag_edit', 'can_dag_read']
- })
- self.assert_user_has_dag_perms(
- perms=['can_dag_edit', 'can_dag_read'],
- dag_id='access_control_test',
- )
+ username = 'access_control_is_set_on_init'
+ role_name = 'team-a'
+ with self.app.app_context():
+ user = fab_utils.create_user(self.app, username, role_name,
permissions=[],)
+ self.expect_user_is_in_role(user, rolename='team-a')
+ self.security_manager.sync_perm_for_dag(
+ 'access_control_test', access_control={'team-a': ['can_edit',
'can_read']}
+ )
+ self.assert_user_has_dag_perms(
+ perms=['can_edit', 'can_read'], dag_id='access_control_test',
user=user
+ )
- self.expect_user_is_in_role(self.user, rolename='NOT-team-a')
- self.assert_user_does_not_have_dag_perms(
- perms=['can_dag_edit', 'can_dag_read'],
- dag_id='access_control_test',
- )
+ self.expect_user_is_in_role(user, rolename='NOT-team-a')
+ self.assert_user_does_not_have_dag_perms(
+ perms=['can_edit', 'can_read'], dag_id='access_control_test',
user=user
+ )
def test_access_control_stale_perms_are_revoked(self):
- self.expect_user_is_in_role(self.user, rolename='team-a')
- self.security_manager.sync_perm_for_dag(
- 'access_control_test',
- access_control={'team-a': READ_WRITE})
- self.assert_user_has_dag_perms(
- perms=READ_WRITE,
- dag_id='access_control_test',
- )
+ username = 'access_control_stale_perms_are_revoked'
+ role_name = 'team-a'
+ with self.app.app_context():
+ user = fab_utils.create_user(self.app, username, role_name,
permissions=[],)
+ self.expect_user_is_in_role(user, rolename='team-a')
+ self.security_manager.sync_perm_for_dag(
+ 'access_control_test', access_control={'team-a': READ_WRITE}
+ )
+ self.assert_user_has_dag_perms(perms=READ_WRITE,
dag_id='access_control_test', user=user)
- self.security_manager.sync_perm_for_dag(
- 'access_control_test',
- access_control={'team-a': READ_ONLY})
- self.assert_user_has_dag_perms(
- perms=['can_dag_read'],
- dag_id='access_control_test',
- )
- self.assert_user_does_not_have_dag_perms(
- perms=['can_dag_edit'],
- dag_id='access_control_test',
- )
+ self.security_manager.sync_perm_for_dag(
+ 'access_control_test', access_control={'team-a': READ_ONLY}
+ )
+ self.assert_user_has_dag_perms(perms=['can_read'],
dag_id='access_control_test', user=user)
+ self.assert_user_does_not_have_dag_perms(
+ perms=['can_edit'], dag_id='access_control_test', user=user
+ )
def test_no_additional_dag_permission_views_created(self):
ab_perm_view_role = sqla_models.assoc_permissionview_role
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index bb6aeb6..e66b3d8 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -52,6 +52,7 @@ from airflow.models.renderedtifields import
RenderedTaskInstanceFields as RTIF
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES,
RUNNABLE_STATES
from airflow.utils import dates, timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -141,7 +142,6 @@ class TestBase(unittest.TestCase):
role=self.appbuilder.sm.find_role('Admin'),
password='test',
)
-
if username == 'test_user' and not
self.appbuilder.sm.find_user(username='test_user'):
self.appbuilder.sm.add_user(
username='test_user',
@@ -1649,6 +1649,10 @@ class TestDagACLView(TestBase):
super().setUpClass()
dagbag = models.DagBag(include_examples=True)
DAG.bulk_write_to_db(dagbag.dags.values())
+ for username in ['all_dag_user', 'dag_read_only', 'dag_faker',
'dag_tester']:
+ user = cls.appbuilder.sm.find_user(username=username)
+ if user:
+ cls.appbuilder.sm.del_register_user(user)
def prepare_dagruns(self):
dagbag = models.DagBag(include_examples=True)
@@ -1729,21 +1733,28 @@ class TestDagACLView(TestBase):
self.logout()
self.login(username='test',
password='test')
- perm_on_dag = self.appbuilder.sm.\
- find_permission_view_menu('can_dag_edit', 'example_bash_operator')
dag_tester_role = self.appbuilder.sm.find_role('dag_acl_tester')
- self.appbuilder.sm.add_permission_role(dag_tester_role, perm_on_dag)
+ edit_perm_on_dag = self.appbuilder.sm.\
+ find_permission_view_menu('can_edit', 'DAG:example_bash_operator')
+ self.appbuilder.sm.add_permission_role(dag_tester_role,
edit_perm_on_dag)
+ read_perm_on_dag = self.appbuilder.sm.\
+ find_permission_view_menu('can_read', 'DAG:example_bash_operator')
+ self.appbuilder.sm.add_permission_role(dag_tester_role,
read_perm_on_dag)
- perm_on_all_dag = self.appbuilder.sm.\
- find_permission_view_menu('can_dag_edit', 'all_dags')
all_dag_role = self.appbuilder.sm.find_role('all_dag_role')
- self.appbuilder.sm.add_permission_role(all_dag_role, perm_on_all_dag)
+ edit_perm_on_all_dag = self.appbuilder.sm.\
+ find_permission_view_menu('can_edit', permissions.RESOURCE_DAGS)
+ self.appbuilder.sm.add_permission_role(all_dag_role,
edit_perm_on_all_dag)
+ read_perm_on_all_dag = self.appbuilder.sm.\
+ find_permission_view_menu('can_read', permissions.RESOURCE_DAGS)
+ self.appbuilder.sm.add_permission_role(all_dag_role,
read_perm_on_all_dag)
role_user = self.appbuilder.sm.find_role('User')
- self.appbuilder.sm.add_permission_role(role_user, perm_on_all_dag)
+ self.appbuilder.sm.add_permission_role(role_user, read_perm_on_all_dag)
+ self.appbuilder.sm.add_permission_role(role_user, edit_perm_on_all_dag)
read_only_perm_on_dag = self.appbuilder.sm.\
- find_permission_view_menu('can_dag_read', 'example_bash_operator')
+ find_permission_view_menu('can_read', 'DAG:example_bash_operator')
dag_read_only_role = self.appbuilder.sm.find_role('dag_acl_read_only')
self.appbuilder.sm.add_permission_role(dag_read_only_role,
read_only_perm_on_dag)
@@ -1751,19 +1762,17 @@ class TestDagACLView(TestBase):
self.logout()
self.login(username='test',
password='test')
- test_view_menu =
self.appbuilder.sm.find_view_menu('example_bash_operator')
+ test_view_menu =
self.appbuilder.sm.find_view_menu('DAG:example_bash_operator')
perms_views =
self.appbuilder.sm.find_permissions_view_menu(test_view_menu)
- self.assertEqual(len(perms_views), 4)
-
- permissions = [str(perm) for perm in perms_views]
- expected_permissions = [
- 'can read on example_bash_operator',
- 'can dag edit on example_bash_operator',
- 'can dag read on example_bash_operator',
- 'can edit on example_bash_operator',
+ self.assertEqual(len(perms_views), 2)
+
+ perms = [str(perm) for perm in perms_views]
+ expected_perms = [
+ 'can read on DAG:example_bash_operator',
+ 'can edit on DAG:example_bash_operator',
]
- for perm in expected_permissions:
- self.assertIn(perm, permissions)
+ for perm in expected_perms:
+ self.assertIn(perm, perms)
def test_role_permission_associate(self):
self.logout()
@@ -1771,8 +1780,8 @@ class TestDagACLView(TestBase):
password='test')
test_role = self.appbuilder.sm.find_role('dag_acl_tester')
perms = {str(perm) for perm in test_role.permissions}
- self.assertIn('can dag edit on example_bash_operator', perms)
- self.assertNotIn('can dag read on example_bash_operator', perms)
+ self.assertIn('can edit on DAG:example_bash_operator', perms)
+ self.assertIn('can read on DAG:example_bash_operator', perms)
def test_index_success(self):
self.logout()
@@ -2175,7 +2184,7 @@ class TestDagACLView(TestBase):
self.check_content_not_in_response('example_bash_operator', resp)
def test_success_fail_for_read_only_role(self):
- # success endpoint need can_dag_edit, which read only role can not
access
+ # success endpoint need can_edit, which read only role can not access
self.logout()
self.login(username='dag_read_only',
password='dag_read_only')
@@ -2193,7 +2202,7 @@ class TestDagACLView(TestBase):
self.check_content_not_in_response('Wait a minute', resp,
resp_code=302)
def test_tree_success_for_read_only_role(self):
- # tree view only allows can_dag_read, which read only role could access
+ # tree view only allows can_read, which read only role could access
self.logout()
self.login(username='dag_read_only',
password='dag_read_only')