This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c68a319f04 Introducing AirflowClusterPolicySkipDag exception (#32013)
c68a319f04 is described below

commit c68a319f04d90bd91a116d80a1f0d2b06879571e
Author: Changhoon Oh <[email protected]>
AuthorDate: Wed Jul 5 05:08:02 2023 +0900

    Introducing AirflowClusterPolicySkipDag exception (#32013)
---
 airflow/exceptions.py                              |  9 ++++++-
 airflow/models/dagbag.py                           |  5 +++-
 .../cluster-policies.rst                           |  4 ++++
 docs/apache-airflow/best-practices.rst             | 28 ++++++++++++++++++++++
 tests/cluster_policies/__init__.py                 |  9 +++++--
 5 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 9cdb8c418a..8c65a1f66f 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -169,8 +169,15 @@ class AirflowClusterPolicyViolation(AirflowException):
     """Raise when there is a violation of a Cluster Policy in DAG 
definition."""
 
 
+class AirflowClusterPolicySkipDag(AirflowException):
+    """Raise when skipping dag is needed in Cluster Policy."""
+
+
 class AirflowClusterPolicyError(AirflowException):
-    """Raise when there is an error except AirflowClusterPolicyViolation in 
Cluster Policy."""
+    """
+    Raise when there is an error in Cluster Policy,
+    except AirflowClusterPolicyViolation and AirflowClusterPolicySkipDag.
+    """
 
 
 class AirflowTimetableInvalid(AirflowException):
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index b422d4699e..1b80cb0158 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -38,6 +38,7 @@ from airflow import settings
 from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowClusterPolicyError,
+    AirflowClusterPolicySkipDag,
     AirflowClusterPolicyViolation,
     AirflowDagCycleException,
     AirflowDagDuplicatedIdException,
@@ -442,6 +443,8 @@ class DagBag(LoggingMixin):
             try:
                 dag.validate()
                 self.bag_dag(dag=dag, root_dag=dag)
+            except AirflowClusterPolicySkipDag:
+                pass
             except Exception as e:
                 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
                 self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
@@ -477,7 +480,7 @@ class DagBag(LoggingMixin):
 
             for task in dag.tasks:
                 settings.task_policy(task)
-        except AirflowClusterPolicyViolation:
+        except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
             raise
         except Exception as e:
             self.log.exception(e)
diff --git 
a/docs/apache-airflow/administration-and-deployment/cluster-policies.rst 
b/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
index ddf05fde69..c664d0b503 100644
--- a/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
+++ b/docs/apache-airflow/administration-and-deployment/cluster-policies.rst
@@ -43,6 +43,10 @@ There are three main types of cluster policy:
 The DAG and Task cluster policies can raise the  
:class:`~airflow.exceptions.AirflowClusterPolicyViolation`
 exception to indicate that the dag/task they were passed is not compliant and 
should not be loaded.
 
+They can also raise the 
:class:`~airflow.exceptions.AirflowClusterPolicySkipDag` exception
+when skipping that DAG is needed intentionally. Unlike 
:class:`~airflow.exceptions.AirflowClusterPolicyViolation`,
+this exception is not displayed on the Airflow web UI (Internally, it's not 
recorded on ``import_error`` table on meta database.)
+
 Any extra attributes set by a cluster policy take priority over those defined 
in your DAG file; for example,
 if you set an ``sla`` on your Task in the DAG file, and then your cluster 
policy also sets an ``sla``, the
 cluster policy's value will take precedence.
diff --git a/docs/apache-airflow/best-practices.rst 
b/docs/apache-airflow/best-practices.rst
index 2e09f9141d..b7202ef5a0 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -394,6 +394,34 @@ It's important to note, that without ``watcher`` task, the 
whole DAG Run will ge
 If we want the ``watcher`` to monitor the state of all tasks, we need to make 
it dependent on all of them separately. Thanks to this, we can fail the DAG Run 
if any of the tasks fail. Note that the watcher task has a trigger rule set to 
``"one_failed"``.
 On the other hand, without the ``teardown`` task, the ``watcher`` task will 
not be needed, because ``failing_task`` will propagate its ``failed`` state to 
downstream task ``passed_task`` and the whole DAG Run will also get the 
``failed`` status.
 
+
+Using AirflowClusterPolicySkipDag exception in cluster policies to skip 
specific DAGs
+-------------------------------------------------------------------------------------
+
+.. versionadded:: 2.7
+
+Airflow DAGs can usually be deployed and updated with the specific branch of 
Git repository via ``git-sync``.
+But, when you have to run multiple Airflow clusters for some operational 
reasons, it's very cumbersome to maintain multiple Git branches.
+Especially, you have some difficulties when you need to synchronize two 
separate branches(like ``prod`` and ``beta``) periodically with proper 
branching strategy.
+
+- cherry-pick is too cumbersome to maintain Git repository.
+- hard-reset is not recommended way for GitOps
+
+So, you can consider connecting multiple Airflow clusters with same Git branch 
(like ``main``), and maintaining those with different environment variables and 
different connection configurations with same ``connection_id``.
+you can also raise :class:`~airflow.exceptions.AirflowClusterPolicySkipDag` 
exception on the cluster policy, to load specific DAGs to 
:class:`~airflow.models.dagbag.DagBag` on the specific Airflow deployment only, 
if needed.
+
+.. code-block:: python
+
+  def dag_policy(dag: DAG):
+      """Skipping the DAG with `only_for_beta` tag."""
+
+      if "only_for_beta" in dag.tags:
+          raise AirflowClusterPolicySkipDag(
+              f"DAG {dag.dag_id} is not loaded on the production cluster, due 
to `only_for_beta` tag."
+          )
+
+The example above, shows the ``dag_policy`` code snippet to skip the DAG 
depending on the tags it has.
+
 .. _best_practices/reducing_dag_complexity:
 
 Reducing DAG complexity
diff --git a/tests/cluster_policies/__init__.py 
b/tests/cluster_policies/__init__.py
index 40c3b21406..5aaf5701ac 100644
--- a/tests/cluster_policies/__init__.py
+++ b/tests/cluster_policies/__init__.py
@@ -22,7 +22,7 @@ from datetime import timedelta
 from typing import Callable
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.exceptions import AirflowClusterPolicySkipDag, 
AirflowClusterPolicyViolation
 from airflow.models import DAG, TaskInstance
 from airflow.models.baseoperator import BaseOperator
 
@@ -73,12 +73,17 @@ def example_task_policy(task: BaseOperator):
 
 # [START example_dag_cluster_policy]
 def dag_policy(dag: DAG):
-    """Ensure that DAG has at least one tag"""
+    """Ensure that DAG has at least one tag and skip the DAG with 
`only_for_beta` tag."""
     if not dag.tags:
         raise AirflowClusterPolicyViolation(
             f"DAG {dag.dag_id} has no tags. At least one tag required. File 
path: {dag.fileloc}"
         )
 
+    if "only_for_beta" in dag.tags:
+        raise AirflowClusterPolicySkipDag(
+            f"DAG {dag.dag_id} is not loaded on the production cluster, due to 
`only_for_beta` tag."
+        )
+
 
 # [END example_dag_cluster_policy]
 

Reply via email to