This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f7ffee8207 Add more validation of bundle config list (#47618)
1f7ffee8207 is described below
commit 1f7ffee8207331967294265aa2446a12b33c450d
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Mar 11 15:29:11 2025 -0700
Add more validation of bundle config list (#47618)
---
airflow/dag_processing/bundles/manager.py | 86 ++++++++++++++--------
.../bundles/test_dag_bundle_manager.py | 4 +-
2 files changed, 56 insertions(+), 34 deletions(-)
diff --git a/airflow/dag_processing/bundles/manager.py
b/airflow/dag_processing/bundles/manager.py
index 09767281438..18e855230bc 100644
--- a/airflow/dag_processing/bundles/manager.py
+++ b/airflow/dag_processing/bundles/manager.py
@@ -32,6 +32,52 @@ if TYPE_CHECKING:
from airflow.dag_processing.bundles.base import BaseDagBundle
+_example_dag_bundle_name = "example_dags"
+
+
+def _bundle_item_exc(msg):
+ return AirflowConfigException(
+ "Invalid config for section `dag_processor` key
`dag_bundle_config_list`. " + msg
+ )
+
+
+def _validate_bundle_config(config_list):
+ all_names = []
+ expected_keys = {"name", "classpath", "kwargs"}
+ for item in config_list:
+ if not isinstance(item, dict):
+ raise _bundle_item_exc(f"Expected dict but got {item.__class__}")
+ actual_keys = set(item.keys())
+ if not actual_keys == expected_keys:
+ raise _bundle_item_exc(f"Expected keys {expected_keys} but found
{actual_keys}")
+ bundle_name = item["name"]
+ if not bundle_name:
+ raise _bundle_item_exc(f"Item {item} missing required `name`
attr.")
+ if bundle_name == _example_dag_bundle_name:
+ raise AirflowConfigException(
+ f"Bundle name '{_example_dag_bundle_name}' is a reserved name.
Please choose another name for your bundle."
+ " Example DAGs can be enabled with the '[core] load_examples'
config."
+ )
+
+ all_names.append(bundle_name)
+ if len(all_names) != len(set(all_names)):
+ raise _bundle_item_exc(f"One or more bundle names appeared multiple
times: {all_names}")
+
+
+def _add_example_dag_bundle(config_list):
+ from airflow import example_dags
+
+ example_dag_folder = next(iter(example_dags.__path__))
+ config_list.append(
+ {
+ "name": _example_dag_bundle_name,
+ "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
+ "kwargs": {
+ "path": example_dag_folder,
+ },
+ }
+ )
+
class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""
@@ -54,44 +100,20 @@ class DagBundlesManager(LoggingMixin):
if self._bundle_config:
return
- backends = conf.getjson("dag_processor", "dag_bundle_config_list")
-
- if not backends:
+ config_list = conf.getjson("dag_processor", "dag_bundle_config_list")
+ if not config_list:
return
-
- if not isinstance(backends, list):
- raise AirflowConfigException(
- "Bundle config is not a list. Check config value"
- " for section `dag_processor` and key
`dag_bundle_config_list`."
- )
-
- if any(b["name"] == "example_dags" for b in backends):
+ if not isinstance(config_list, list):
raise AirflowConfigException(
- "Bundle name 'example_dags' is a reserved name. Please choose
another name for your bundle."
- " Example DAGs can be enabled with the '[core] load_examples'
config."
+ "Section `dag_processor` key `dag_bundle_config_list` "
+ f"must be list but got {config_list.__class__}"
)
-
- # example dags!
+ _validate_bundle_config(config_list)
if conf.getboolean("core", "LOAD_EXAMPLES"):
- from airflow import example_dags
-
- example_dag_folder = next(iter(example_dags.__path__))
- backends.append(
- {
- "name": "example_dags",
- "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
- "kwargs": {
- "path": example_dag_folder,
- },
- }
- )
+ _add_example_dag_bundle(config_list)
- seen = set()
- for cfg in backends:
+ for cfg in config_list:
name = cfg["name"]
- if name in seen:
- raise ValueError(f"Dag bundle {name} is configured twice.")
- seen.add(name)
class_ = import_string(cfg["classpath"])
kwargs = cfg["kwargs"]
self._bundle_config[name] = (class_, kwargs)
diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py
b/tests/dag_processing/bundles/test_dag_bundle_manager.py
index cba51c33efb..15bf2dbc1d5 100644
--- a/tests/dag_processing/bundles/test_dag_bundle_manager.py
+++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py
@@ -62,7 +62,7 @@ from tests_common.test_utils.db import clear_db_dag_bundles
set(),
id="remove_dags_folder_default",
),
- pytest.param("1", "Bundle config is not a list", id="int"),
+ pytest.param("1", "key `dag_bundle_config_list` must be list",
id="int"),
pytest.param("abc", "Unable to parse .* as valid json", id="not_json"),
],
)
@@ -192,7 +192,7 @@ def test_example_dags_bundle_added():
def test_example_dags_name_is_reserved():
- reserved_name_config = [{"name": "example_dags"}]
+ reserved_name_config = [{"name": "example_dags", "classpath": "yo face",
"kwargs": {}}]
with conf_vars({("dag_processor", "dag_bundle_config_list"):
json.dumps(reserved_name_config)}):
with pytest.raises(AirflowConfigException, match="Bundle name
'example_dags' is a reserved name."):
DagBundlesManager().parse_config()