This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d7d944e381 Fix: Keep compatibility with old FAB versions (#41549)
d7d944e381 is described below

commit d7d944e3818baf98c310314e369d767866012939
Author: Joao Amaral <[email protected]>
AuthorDate: Tue Aug 20 16:42:05 2024 -0300

    Fix: Keep compatibility with old FAB versions (#41549)
---
 airflow/models/dag.py    | 30 +++++++++++++++++--------
 tests/models/test_dag.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 518b367067..b685b28343 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -57,6 +57,7 @@ import pendulum
 import re2
 import sqlalchemy_jsonfield
 from dateutil.relativedelta import relativedelta
+from packaging import version as packaging_version
 from sqlalchemy import (
     Boolean,
     Column,
@@ -116,6 +117,7 @@ from airflow.models.taskinstance import (
     clear_task_instances,
 )
 from airflow.models.tasklog import LogTemplate
+from airflow.providers.fab import __version__ as FAB_VERSION
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
 from airflow.settings import json
@@ -936,16 +938,26 @@ class DAG(LoggingMixin):
 
         updated_access_control = {}
         for role, perms in access_control.items():
-            updated_access_control[role] = updated_access_control.get(role, {})
-            if isinstance(perms, (set, list)):
-                # Support for old-style access_control where only the actions 
are specified
-                updated_access_control[role][permissions.RESOURCE_DAG] = 
set(perms)
+            if packaging_version.parse(FAB_VERSION) >= 
packaging_version.parse("1.3.0"):
+                updated_access_control[role] = 
updated_access_control.get(role, {})
+                if isinstance(perms, (set, list)):
+                    # Support for old-style access_control where only the 
actions are specified
+                    updated_access_control[role][permissions.RESOURCE_DAG] = 
set(perms)
+                else:
+                    updated_access_control[role] = perms
+                if permissions.RESOURCE_DAG in updated_access_control[role]:
+                    updated_access_control[role][permissions.RESOURCE_DAG] = {
+                        update_old_perm(perm)
+                        for perm in 
updated_access_control[role][permissions.RESOURCE_DAG]
+                    }
+            elif isinstance(perms, dict):
+                # Not allow new access control format with old FAB versions
+                raise AirflowException(
+                    "Please upgrade the FAB provider to a version >= 1.3.0 to 
allow "
+                    "use the Dag Level Access Control new format."
+                )
             else:
-                updated_access_control[role] = perms
-            if permissions.RESOURCE_DAG in updated_access_control[role]:
-                updated_access_control[role][permissions.RESOURCE_DAG] = {
-                    update_old_perm(perm) for perm in 
updated_access_control[role][permissions.RESOURCE_DAG]
-                }
+                updated_access_control[role] = {update_old_perm(perm) for perm 
in perms}
 
         return updated_access_control
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0bd58ddc52..4994e4545d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2539,6 +2539,63 @@ my_postgres_conn:
         assert "permission is deprecated" in 
str(deprecation_warnings[0].message)
         assert "permission is deprecated" in 
str(deprecation_warnings[1].message)
 
+    @pytest.mark.parametrize(
+        "fab_version, perms, expected_exception, expected_perms",
+        [
+            pytest.param(
+                "1.2.0",
+                {
+                    "role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},
+                    "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
+                    # will raise error in old FAB with new access control 
format
+                },
+                AirflowException,
+                None,
+                id="old_fab_new_access_control_format",
+            ),
+            pytest.param(
+                "1.2.0",
+                {
+                    "role1": [
+                        permissions.ACTION_CAN_READ,
+                        permissions.ACTION_CAN_EDIT,
+                        permissions.ACTION_CAN_READ,
+                    ],
+                },
+                None,
+                {"role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}},
+                id="old_fab_old_access_control_format",
+            ),
+            pytest.param(
+                "1.3.0",
+                {
+                    "role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},  # old format
+                    "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},  # new format
+                },
+                None,
+                {
+                    "role1": {
+                        permissions.RESOURCE_DAG: 
{permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
+                    },
+                    "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
+                },
+                id="new_fab_mixed_access_control_format",
+            ),
+        ],
+    )
+    def test_access_control_format(self, fab_version, perms, 
expected_exception, expected_perms):
+        if expected_exception:
+            with patch("airflow.models.dag.FAB_VERSION", fab_version):
+                with pytest.raises(
+                    expected_exception,
+                    match="Please upgrade the FAB provider to a version >= 
1.3.0 to allow use the Dag Level Access Control new format.",
+                ):
+                    DAG(dag_id="dag_test", schedule=None, access_control=perms)
+        else:
+            with patch("airflow.models.dag.FAB_VERSION", fab_version):
+                dag = DAG(dag_id="dag_test", schedule=None, 
access_control=perms)
+            assert dag.access_control == expected_perms
+
     def test_validate_executor_field_executor_not_configured(self):
         dag = DAG("test-dag", schedule=None)
         EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")

Reply via email to