This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f7ffee8207 Add more validation of bundle config list (#47618)
1f7ffee8207 is described below

commit 1f7ffee8207331967294265aa2446a12b33c450d
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Mar 11 15:29:11 2025 -0700

    Add more validation of bundle config list (#47618)
---
 airflow/dag_processing/bundles/manager.py          | 86 ++++++++++++++--------
 .../bundles/test_dag_bundle_manager.py             |  4 +-
 2 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/airflow/dag_processing/bundles/manager.py 
b/airflow/dag_processing/bundles/manager.py
index 09767281438..18e855230bc 100644
--- a/airflow/dag_processing/bundles/manager.py
+++ b/airflow/dag_processing/bundles/manager.py
@@ -32,6 +32,52 @@ if TYPE_CHECKING:
 
     from airflow.dag_processing.bundles.base import BaseDagBundle
 
+_example_dag_bundle_name = "example_dags"
+
+
+def _bundle_item_exc(msg):
+    return AirflowConfigException(
+        "Invalid config for section `dag_processor` key 
`dag_bundle_config_list`. " + msg
+    )
+
+
+def _validate_bundle_config(config_list):
+    all_names = []
+    expected_keys = {"name", "classpath", "kwargs"}
+    for item in config_list:
+        if not isinstance(item, dict):
+            raise _bundle_item_exc(f"Expected dict but got {item.__class__}")
+        actual_keys = set(item.keys())
+        if not actual_keys == expected_keys:
+            raise _bundle_item_exc(f"Expected keys {expected_keys} but found 
{actual_keys}")
+        bundle_name = item["name"]
+        if not bundle_name:
+            raise _bundle_item_exc(f"Item {item} missing required `name` 
attr.")
+        if bundle_name == _example_dag_bundle_name:
+            raise AirflowConfigException(
+                f"Bundle name '{_example_dag_bundle_name}' is a reserved name. 
Please choose another name for your bundle."
+                " Example DAGs can be enabled with the '[core] load_examples' 
config."
+            )
+
+        all_names.append(bundle_name)
+    if len(all_names) != len(set(all_names)):
+        raise _bundle_item_exc(f"One or more bundle names appeared multiple 
times: {all_names}")
+
+
+def _add_example_dag_bundle(config_list):
+    from airflow import example_dags
+
+    example_dag_folder = next(iter(example_dags.__path__))
+    config_list.append(
+        {
+            "name": _example_dag_bundle_name,
+            "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
+            "kwargs": {
+                "path": example_dag_folder,
+            },
+        }
+    )
+
 
 class DagBundlesManager(LoggingMixin):
     """Manager for DAG bundles."""
@@ -54,44 +100,20 @@ class DagBundlesManager(LoggingMixin):
         if self._bundle_config:
             return
 
-        backends = conf.getjson("dag_processor", "dag_bundle_config_list")
-
-        if not backends:
+        config_list = conf.getjson("dag_processor", "dag_bundle_config_list")
+        if not config_list:
             return
-
-        if not isinstance(backends, list):
-            raise AirflowConfigException(
-                "Bundle config is not a list. Check config value"
-                " for section `dag_processor` and key 
`dag_bundle_config_list`."
-            )
-
-        if any(b["name"] == "example_dags" for b in backends):
+        if not isinstance(config_list, list):
             raise AirflowConfigException(
-                "Bundle name 'example_dags' is a reserved name. Please choose 
another name for your bundle."
-                " Example DAGs can be enabled with the '[core] load_examples' 
config."
+                "Section `dag_processor` key `dag_bundle_config_list` "
+                f"must be list but got {config_list.__class__}"
             )
-
-        # example dags!
+        _validate_bundle_config(config_list)
         if conf.getboolean("core", "LOAD_EXAMPLES"):
-            from airflow import example_dags
-
-            example_dag_folder = next(iter(example_dags.__path__))
-            backends.append(
-                {
-                    "name": "example_dags",
-                    "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
-                    "kwargs": {
-                        "path": example_dag_folder,
-                    },
-                }
-            )
+            _add_example_dag_bundle(config_list)
 
-        seen = set()
-        for cfg in backends:
+        for cfg in config_list:
             name = cfg["name"]
-            if name in seen:
-                raise ValueError(f"Dag bundle {name} is configured twice.")
-            seen.add(name)
             class_ = import_string(cfg["classpath"])
             kwargs = cfg["kwargs"]
             self._bundle_config[name] = (class_, kwargs)
diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py 
b/tests/dag_processing/bundles/test_dag_bundle_manager.py
index cba51c33efb..15bf2dbc1d5 100644
--- a/tests/dag_processing/bundles/test_dag_bundle_manager.py
+++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py
@@ -62,7 +62,7 @@ from tests_common.test_utils.db import clear_db_dag_bundles
             set(),
             id="remove_dags_folder_default",
         ),
-        pytest.param("1", "Bundle config is not a list", id="int"),
+        pytest.param("1", "key `dag_bundle_config_list` must be list", 
id="int"),
         pytest.param("abc", "Unable to parse .* as valid json", id="not_json"),
     ],
 )
@@ -192,7 +192,7 @@ def test_example_dags_bundle_added():
 
 
 def test_example_dags_name_is_reserved():
-    reserved_name_config = [{"name": "example_dags"}]
+    reserved_name_config = [{"name": "example_dags", "classpath": "yo face", 
"kwargs": {}}]
     with conf_vars({("dag_processor", "dag_bundle_config_list"): 
json.dumps(reserved_name_config)}):
         with pytest.raises(AirflowConfigException, match="Bundle name 
'example_dags' is a reserved name."):
             DagBundlesManager().parse_config()

Reply via email to