Jorricks commented on a change in pull request #16634:
URL: https://github.com/apache/airflow/pull/16634#discussion_r711574274
##########
File path: airflow/www/views.py
##########
@@ -3145,6 +3156,65 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+ """
+ This ModelView prevents ability to pass primary keys of objects relating
to DAGs you shouldn't be able to
+ edit. This only holds for the add, update and delete operations.
+ You will still need to use the `action_has_dag_edit_access()` for actions.
+ """
+
+ @staticmethod
+ def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+ """Validates whether the user has 'can_edit' access for this specific
DAG."""
+ if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+ raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+ def pre_add(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_update(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_delete(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def post_add_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_edit_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_delete_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+ """Decorator for actions which verifies you have DAG edit access on the
given tis/drs."""
+
+ @wraps(action_func)
+ def check_dag_edit_acl_for_actions(
+ self,
+ items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance,
DagRun, None]],
+ *args,
+ **kwargs,
+ ) -> None:
+ if items is None:
+ dag_ids: Set[str] = set()
+ elif isinstance(items, list):
+ dag_ids = {item.dag_id for item in items if item is not None}
+ else:
+ dag_ids = items.dag_id
+
+ for dag_id in dag_ids:
Review comment:
I fixed the issue and created a test to just test this specific
decorator.
Interested to hear what you think.
##########
File path: airflow/www/views.py
##########
@@ -3145,6 +3156,65 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+ """
+ This ModelView prevents ability to pass primary keys of objects relating
to DAGs you shouldn't be able to
+ edit. This only holds for the add, update and delete operations.
+ You will still need to use the `action_has_dag_edit_access()` for actions.
+ """
+
+ @staticmethod
+ def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+ """Validates whether the user has 'can_edit' access for this specific
DAG."""
+ if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+ raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+ def pre_add(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_update(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_delete(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def post_add_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_edit_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_delete_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+ """Decorator for actions which verifies you have DAG edit access on the
given tis/drs."""
+
+ @wraps(action_func)
+ def check_dag_edit_acl_for_actions(
+ self,
+ items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance,
DagRun, None]],
+ *args,
+ **kwargs,
+ ) -> None:
+ if items is None:
+ dag_ids: Set[str] = set()
+ elif isinstance(items, list):
+ dag_ids = {item.dag_id for item in items if item is not None}
+ else:
+ dag_ids = items.dag_id
+
+ for dag_id in dag_ids:
Review comment:
I fixed the issue and you mentioned created a test to just test this
specific decorator.
Interested to hear what you think.
##########
File path: airflow/www/views.py
##########
@@ -3145,6 +3156,65 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+ """
+ This ModelView prevents ability to pass primary keys of objects relating
to DAGs you shouldn't be able to
+ edit. This only holds for the add, update and delete operations.
+ You will still need to use the `action_has_dag_edit_access()` for actions.
+ """
+
+ @staticmethod
+ def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+ """Validates whether the user has 'can_edit' access for this specific
DAG."""
+ if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+ raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+ def pre_add(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_update(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_delete(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def post_add_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_edit_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_delete_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+ """Decorator for actions which verifies you have DAG edit access on the
given tis/drs."""
+
+ @wraps(action_func)
+ def check_dag_edit_acl_for_actions(
+ self,
+ items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance,
DagRun, None]],
+ *args,
+ **kwargs,
+ ) -> None:
+ if items is None:
+ dag_ids: Set[str] = set()
+ elif isinstance(items, list):
+ dag_ids = {item.dag_id for item in items if item is not None}
+ else:
+ dag_ids = items.dag_id
+
+ for dag_id in dag_ids:
Review comment:
I fixed the issue and you mentioned created a test for this specific
decorator.
Interested to hear what you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]