This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c68a319f04 Introducing AirflowClusterPolicySkipDag exception (#32013)
c68a319f04 is described below
commit c68a319f04d90bd91a116d80a1f0d2b06879571e
Author: Changhoon Oh <[email protected]>
AuthorDate: Wed Jul 5 05:08:02 2023 +0900
Introducing AirflowClusterPolicySkipDag exception (#32013)
---
airflow/exceptions.py | 9 ++++++-
airflow/models/dagbag.py | 5 +++-
.../cluster-policies.rst | 4 ++++
docs/apache-airflow/best-practices.rst | 28 ++++++++++++++++++++++
tests/cluster_policies/__init__.py | 9 +++++--
5 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 9cdb8c418a..8c65a1f66f 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -169,8 +169,15 @@ class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG
definition."""
+class AirflowClusterPolicySkipDag(AirflowException):
+ """Raise when skipping dag is needed in Cluster Policy."""
+
+
class AirflowClusterPolicyError(AirflowException):
- """Raise when there is an error except AirflowClusterPolicyViolation in
Cluster Policy."""
+ """
+ Raise when there is an error in Cluster Policy,
+ except AirflowClusterPolicyViolation and AirflowClusterPolicySkipDag.
+ """
class AirflowTimetableInvalid(AirflowException):
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index b422d4699e..1b80cb0158 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -38,6 +38,7 @@ from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import (
AirflowClusterPolicyError,
+ AirflowClusterPolicySkipDag,
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
@@ -442,6 +443,8 @@ class DagBag(LoggingMixin):
try:
dag.validate()
self.bag_dag(dag=dag, root_dag=dag)
+ except AirflowClusterPolicySkipDag:
+ pass
except Exception as e:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
@@ -477,7 +480,7 @@ class DagBag(LoggingMixin):
for task in dag.tasks:
settings.task_policy(task)
- except AirflowClusterPolicyViolation:
+ except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
raise
except Exception as e:
self.log.exception(e)
diff --git
a/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
b/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
index ddf05fde69..c664d0b503 100644
--- a/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
+++ b/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
@@ -43,6 +43,10 @@ There are three main types of cluster policy:
The DAG and Task cluster policies can raise the
:class:`~airflow.exceptions.AirflowClusterPolicyViolation`
exception to indicate that the dag/task they were passed is not compliant and
should not be loaded.
+They can also raise the
:class:`~airflow.exceptions.AirflowClusterPolicySkipDag` exception
+when skipping that DAG is needed intentionally. Unlike
:class:`~airflow.exceptions.AirflowClusterPolicyViolation`,
+this exception is not displayed on the Airflow web UI (Internally, it's not
recorded on ``import_error`` table on meta database.)
+
Any extra attributes set by a cluster policy take priority over those defined
in your DAG file; for example,
if you set an ``sla`` on your Task in the DAG file, and then your cluster
policy also sets an ``sla``, the
cluster policy's value will take precedence.
diff --git a/docs/apache-airflow/best-practices.rst
b/docs/apache-airflow/best-practices.rst
index 2e09f9141d..b7202ef5a0 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -394,6 +394,34 @@ It's important to note, that without ``watcher`` task, the
whole DAG Run will ge
If we want the ``watcher`` to monitor the state of all tasks, we need to make
it dependent on all of them separately. Thanks to this, we can fail the DAG Run
if any of the tasks fail. Note that the watcher task has a trigger rule set to
``"one_failed"``.
On the other hand, without the ``teardown`` task, the ``watcher`` task will
not be needed, because ``failing_task`` will propagate its ``failed`` state to
downstream task ``passed_task`` and the whole DAG Run will also get the
``failed`` status.
+
+Using AirflowClusterPolicySkipDag exception in cluster policies to skip
specific DAGs
+-------------------------------------------------------------------------------------
+
+.. versionadded:: 2.7
+
+Airflow DAGs can usually be deployed and updated with the specific branch of
Git repository via ``git-sync``.
+But, when you have to run multiple Airflow clusters for some operational
reasons, it's very cumbersome to maintain multiple Git branches.
+Especially, you have some difficulties when you need to synchronize two
separate branches(like ``prod`` and ``beta``) periodically with proper
branching strategy.
+
+- cherry-pick is too cumbersome to maintain Git repository.
+- hard-reset is not recommended way for GitOps
+
+So, you can consider connecting multiple Airflow clusters with same Git branch
(like ``main``), and maintaining those with different environment variables and
different connection configurations with same ``connection_id``.
+you can also raise :class:`~airflow.exceptions.AirflowClusterPolicySkipDag`
exception on the cluster policy, to load specific DAGs to
:class:`~airflow.models.dagbag.DagBag` on the specific Airflow deployment only,
if needed.
+
+.. code-block:: python
+
+ def dag_policy(dag: DAG):
+ """Skipping the DAG with `only_for_beta` tag."""
+
+ if "only_for_beta" in dag.tags:
+ raise AirflowClusterPolicySkipDag(
+ f"DAG {dag.dag_id} is not loaded on the production cluster, due
to `only_for_beta` tag."
+ )
+
+The example above, shows the ``dag_policy`` code snippet to skip the DAG
depending on the tags it has.
+
.. _best_practices/reducing_dag_complexity:
Reducing DAG complexity
diff --git a/tests/cluster_policies/__init__.py
b/tests/cluster_policies/__init__.py
index 40c3b21406..5aaf5701ac 100644
--- a/tests/cluster_policies/__init__.py
+++ b/tests/cluster_policies/__init__.py
@@ -22,7 +22,7 @@ from datetime import timedelta
from typing import Callable
from airflow.configuration import conf
-from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.exceptions import AirflowClusterPolicySkipDag,
AirflowClusterPolicyViolation
from airflow.models import DAG, TaskInstance
from airflow.models.baseoperator import BaseOperator
@@ -73,12 +73,17 @@ def example_task_policy(task: BaseOperator):
# [START example_dag_cluster_policy]
def dag_policy(dag: DAG):
- """Ensure that DAG has at least one tag"""
+ """Ensure that DAG has at least one tag and skip the DAG with
`only_for_beta` tag."""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File
path: {dag.fileloc}"
)
+ if "only_for_beta" in dag.tags:
+ raise AirflowClusterPolicySkipDag(
+ f"DAG {dag.dag_id} is not loaded on the production cluster, due to
`only_for_beta` tag."
+ )
+
# [END example_dag_cluster_policy]