This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d7d944e381 Fix: Keep compatibility with old FAB versions (#41549)
d7d944e381 is described below
commit d7d944e3818baf98c310314e369d767866012939
Author: Joao Amaral <[email protected]>
AuthorDate: Tue Aug 20 16:42:05 2024 -0300
Fix: Keep compatibility with old FAB versions (#41549)
---
airflow/models/dag.py | 30 +++++++++++++++++--------
tests/models/test_dag.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 78 insertions(+), 9 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 518b367067..b685b28343 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -57,6 +57,7 @@ import pendulum
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
+from packaging import version as packaging_version
from sqlalchemy import (
Boolean,
Column,
@@ -116,6 +117,7 @@ from airflow.models.taskinstance import (
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
+from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
@@ -936,16 +938,26 @@ class DAG(LoggingMixin):
updated_access_control = {}
for role, perms in access_control.items():
- updated_access_control[role] = updated_access_control.get(role, {})
- if isinstance(perms, (set, list)):
- # Support for old-style access_control where only the actions
are specified
- updated_access_control[role][permissions.RESOURCE_DAG] =
set(perms)
+ if packaging_version.parse(FAB_VERSION) >=
packaging_version.parse("1.3.0"):
+ updated_access_control[role] =
updated_access_control.get(role, {})
+ if isinstance(perms, (set, list)):
+ # Support for old-style access_control where only the
actions are specified
+ updated_access_control[role][permissions.RESOURCE_DAG] =
set(perms)
+ else:
+ updated_access_control[role] = perms
+ if permissions.RESOURCE_DAG in updated_access_control[role]:
+ updated_access_control[role][permissions.RESOURCE_DAG] = {
+ update_old_perm(perm)
+ for perm in
updated_access_control[role][permissions.RESOURCE_DAG]
+ }
+ elif isinstance(perms, dict):
+ # Not allow new access control format with old FAB versions
+ raise AirflowException(
+ "Please upgrade the FAB provider to a version >= 1.3.0 to
allow "
+ "use the Dag Level Access Control new format."
+ )
else:
- updated_access_control[role] = perms
- if permissions.RESOURCE_DAG in updated_access_control[role]:
- updated_access_control[role][permissions.RESOURCE_DAG] = {
- update_old_perm(perm) for perm in
updated_access_control[role][permissions.RESOURCE_DAG]
- }
+ updated_access_control[role] = {update_old_perm(perm) for perm
in perms}
return updated_access_control
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0bd58ddc52..4994e4545d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2539,6 +2539,63 @@ my_postgres_conn:
assert "permission is deprecated" in
str(deprecation_warnings[0].message)
assert "permission is deprecated" in
str(deprecation_warnings[1].message)
+ @pytest.mark.parametrize(
+ "fab_version, perms, expected_exception, expected_perms",
+ [
+ pytest.param(
+ "1.2.0",
+ {
+ "role1": {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT},
+ "role3": {permissions.RESOURCE_DAG_RUN:
{permissions.ACTION_CAN_CREATE}},
+ # will raise error in old FAB with new access control
format
+ },
+ AirflowException,
+ None,
+ id="old_fab_new_access_control_format",
+ ),
+ pytest.param(
+ "1.2.0",
+ {
+ "role1": [
+ permissions.ACTION_CAN_READ,
+ permissions.ACTION_CAN_EDIT,
+ permissions.ACTION_CAN_READ,
+ ],
+ },
+ None,
+ {"role1": {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT}},
+ id="old_fab_old_access_control_format",
+ ),
+ pytest.param(
+ "1.3.0",
+ {
+ "role1": {permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT}, # old format
+ "role3": {permissions.RESOURCE_DAG_RUN:
{permissions.ACTION_CAN_CREATE}}, # new format
+ },
+ None,
+ {
+ "role1": {
+ permissions.RESOURCE_DAG:
{permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
+ },
+ "role3": {permissions.RESOURCE_DAG_RUN:
{permissions.ACTION_CAN_CREATE}},
+ },
+ id="new_fab_mixed_access_control_format",
+ ),
+ ],
+ )
+ def test_access_control_format(self, fab_version, perms,
expected_exception, expected_perms):
+ if expected_exception:
+ with patch("airflow.models.dag.FAB_VERSION", fab_version):
+ with pytest.raises(
+ expected_exception,
+ match="Please upgrade the FAB provider to a version >=
1.3.0 to allow use the Dag Level Access Control new format.",
+ ):
+ DAG(dag_id="dag_test", schedule=None, access_control=perms)
+ else:
+ with patch("airflow.models.dag.FAB_VERSION", fab_version):
+ dag = DAG(dag_id="dag_test", schedule=None,
access_control=perms)
+ assert dag.access_control == expected_perms
+
def test_validate_executor_field_executor_not_configured(self):
dag = DAG("test-dag", schedule=None)
EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")