This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 87986f3f12109f7144fbc20a1c9dcde07f3c4f58 Author: Jacob Ferriero <[email protected]> AuthorDate: Wed Aug 12 15:06:29 2020 -0700 Add ClusterPolicyViolation support to airflow local settings (#10282) This change will allow users to throw other exceptions (namely `AirflowClusterPolicyViolation`) than `DagCycleException` as part of Cluster Policies. This can be helpful for running checks on tasks / DAGs (e.g. asserting task has a non-airflow owner) and failing to run tasks aren't compliant with these checks. This is meant as a tool for airflow admins to prevent user mistakes (especially in shared Airflow infrastructure with newbies) than as a strong technical control for security/compliance posture. (cherry picked from commit 7f76b8b94241c57dc7de5b17657433841289744e) --- airflow/exceptions.py | 4 +++ airflow/models/dagbag.py | 7 ++-- docs/concepts.rst | 36 ++++++++++++++++++++ tests/cluster_policies/__init__.py | 56 +++++++++++++++++++++++++++++++ tests/dags/test_missing_owner.py | 32 ++++++++++++++++++ tests/dags/test_with_non_default_owner.py | 32 ++++++++++++++++++ tests/models/test_dagbag.py | 32 ++++++++++++++++++ 7 files changed, 196 insertions(+), 3 deletions(-) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index badf156..51e3cb4 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -83,6 +83,10 @@ class AirflowDagCycleException(AirflowException): """Raise when there is a cycle in Dag definition""" +class AirflowClusterPolicyViolation(AirflowException): + """Raise when there is a violation of a Cluster Policy in Dag definition""" + + class DagNotFound(AirflowNotFoundException): """Raise when a DAG is not available in the system""" diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 1b8be89..106dff0 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -36,7 +36,7 @@ import six from airflow import settings from airflow.configuration import conf from airflow.dag.base_dag import BaseDagBag -from airflow.exceptions import AirflowDagCycleException +from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException from airflow.executors import get_default_executor from airflow.settings import Stats from airflow.utils import timezone @@ -317,9 +317,10 @@ class DagBag(BaseDagBag, LoggingMixin): "Invalid Cron expression: " + str(cron_e) self.file_last_changed[dag.full_filepath] = \ file_last_changed_on_disk - except AirflowDagCycleException as cycle_exception: + except (AirflowDagCycleException, + AirflowClusterPolicyViolation) as exception: self.log.exception("Failed to bag_dag: %s", dag.full_filepath) - self.import_errors[dag.full_filepath] = str(cycle_exception) + self.import_errors[dag.full_filepath] = str(exception) self.file_last_changed[dag.full_filepath] = \ file_last_changed_on_disk diff --git a/docs/concepts.rst b/docs/concepts.rst index dd48003..6908ab2 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1066,7 +1066,11 @@ state. Cluster Policy ============== +Cluster policies provide an interface for taking action on every Airflow task +either at DAG load time or just before task execution. +Cluster Policies for Task Mutation +----------------------------------- In case you want to apply cluster-wide mutations to the Airflow tasks, you can either mutate the task right after the DAG is loaded or mutate the task instance before task execution. @@ -1117,6 +1121,38 @@ queue during retries: ti.queue = 'retry_queue' +Cluster Policies for Custom Task Checks +------------------------------------------- +You may also use Cluster Policies to apply cluster-wide checks on Airflow +tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation` +in a policy or task mutation hook (described below) to prevent a DAG from being +imported or prevent a task from being executed if the task is not compliant with +your check. + +These checks are intended to help teams using Airflow to protect against common +beginner errors that may get past a code reviewer, rather than as technical +security controls. + +For example, don't run tasks without airflow owners: + +.. literalinclude:: /../tests/cluster_policies/__init__.py + :language: python + :start-after: [START example_cluster_policy_rule] + :end-before: [END example_cluster_policy_rule] + +If you have multiple checks to apply, it is best practice to curate these rules +in a separate python module and have a single policy / task mutation hook that +performs multiple of these custom checks and aggregates the various error +messages so that a single ``AirflowClusterPolicyViolation`` can be reported in +the UI (and import errors table in the database). + +For Example in ``airflow_local_settings.py``: + +.. literalinclude:: /../tests/cluster_policies/__init__.py + :language: python + :start-after: [START example_list_of_cluster_policy_rules] + :end-before: [END example_list_of_cluster_policy_rules] + Where to put ``airflow_local_settings.py``? ------------------------------------------- diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py new file mode 100644 index 0000000..565b6f8 --- /dev/null +++ b/tests/cluster_policies/__init__.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Callable, List + +from airflow.configuration import conf +from airflow.exceptions import AirflowClusterPolicyViolation +from airflow.models.baseoperator import BaseOperator + + +# [START example_cluster_policy_rule] +def task_must_have_owners(task): + if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'): + raise AirflowClusterPolicyViolation( + 'Task must have non-None non-default owner. Current value: {}'.format(task.owner)) +# [END example_cluster_policy_rule] + + +# [START example_list_of_cluster_policy_rules] +TASK_RULES = [task_must_have_owners] # type: List[Callable[[BaseOperator], None]] + + +def _check_task_rules(current_task): + """Check task rules for given task.""" + notices = [] + for rule in TASK_RULES: + try: + rule(current_task) + except AirflowClusterPolicyViolation as ex: + notices.append(str(ex)) + if notices: + notices_list = " * " + "\n * ".join(notices) + raise AirflowClusterPolicyViolation( + "DAG policy violation (DAG ID: {0}, Path: {1}):\n" + "Notices:\n" + "{2}".format(current_task.dag_id, current_task.dag.filepath, notices_list)) + + +def cluster_policy(task): + """Ensure Tasks have non-default owners.""" + _check_task_rules(task) +# [END example_list_of_cluster_policy_rules] diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py new file mode 100644 index 0000000..16f715c --- /dev/null +++ b/tests/dags/test_missing_owner.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import timedelta + +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.dates import days_ago + +with DAG( + dag_id="test_missing_owner", + schedule_interval="0 0 * * *", + start_date=days_ago(2), + dagrun_timeout=timedelta(minutes=60), + tags=["example"], +) as dag: + run_this_last = DummyOperator(task_id="test_task",) diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py new file mode 100644 index 0000000..eebbb64 --- /dev/null +++ b/tests/dags/test_with_non_default_owner.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import timedelta + +from airflow import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.dates import days_ago + +with DAG( + dag_id="test_with_non_default_owner", + schedule_interval="0 0 * * *", + start_date=days_ago(2), + dagrun_timeout=timedelta(minutes=60), + tags=["example"], +) as dag: + run_this_last = DummyOperator(task_id="test_task", owner="John",) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index b9d18ac..1595cd4 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -38,6 +38,7 @@ from airflow.utils.dates import timezone as tz from airflow.utils.db import create_session from airflow.utils.state import State from airflow.utils.timezone import utc +from tests import cluster_policies from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE from tests.test_utils.asserts import assert_queries_count from tests.test_utils.config import conf_vars @@ -695,3 +696,34 @@ class DagBagTest(unittest.TestCase): six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"]) self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time) + + @patch("airflow.settings.policy", cluster_policies.cluster_policy) + def test_cluster_policy_violation(self): + """test that file processing results in import error when task does not + obey cluster policy. + """ + dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py") + + dagbag = DagBag(dag_folder=dag_file) + self.assertEqual(set(), set(dagbag.dag_ids)) + expected_import_errors = { + dag_file: ( + "DAG policy violation (DAG ID: test_missing_owner, Path: {}):\n" + "Notices:\n" + " * Task must have non-None non-default owner. Current value: airflow".format(dag_file) + ) + } + self.maxDiff = None + self.assertEqual(expected_import_errors, dagbag.import_errors) + + @patch("airflow.settings.policy", cluster_policies.cluster_policy) + def test_cluster_policy_obeyed(self): + """test that dag successfully imported without import errors when tasks + obey cluster policy. + """ + dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py") + + dagbag = DagBag(dag_folder=dag_file) + self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids)) + + self.assertEqual({}, dagbag.import_errors)
