jedcunningham commented on code in PR #45318:
URL: https://github.com/apache/airflow/pull/45318#discussion_r1903148729
##########
airflow/dag_processing/bundles/manager.py:
##########
@@ -90,7 +94,22 @@ def get_bundle(self, name: str, version: str | None = None)
-> BaseDagBundle:
:return: The DAG bundle.
"""
- # TODO: proper validation of the bundle configuration so we have
better error messages
- bundle_config = self.bundle_configs[name]
- bundle_class = import_string(bundle_config["classpath"])
- return bundle_class(name=name, version=version,
**bundle_config["kwargs"])
+ # todo (AIP-66): proper validation of the bundle configuration so we
have better error messages
+ cfg_tuple = _bundle_config.get(name)
+ if not cfg_tuple:
+ self.parse_config()
+ cfg_tuple = _bundle_config.get(name)
+ if not cfg_tuple:
+ raise ValueError(f"Requested bundle '{name}' is not
configured.")
Review Comment:
```suggestion
if not _bundle_config():
self.parse_config()
cfg_tuple = _bundle_config.get(name)
if not cfg_tuple:
raise ValueError(f"Requested bundle '{name}' is not configured.")
```
nit: little nicer. can probably do that conditional check everywhere, or
just call it everywhere and short circuit in `parse_config`.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -112,13 +131,25 @@ def test_get_all_dag_bundles():
bundle_manager = DagBundlesManager()
- with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE":
json.dumps(BASIC_BUNDLE_CONFIG)}):
- bundles = bundle_manager.get_all_dag_bundles()
- assert len(bundles) == 2
+ with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS":
json.dumps(BASIC_BUNDLE_CONFIG)}):
+ bundles = list(bundle_manager.get_all_dag_bundles())
+ assert len(bundles) == 1
+ assert all(isinstance(x, BaseDagBundle) for x in bundles)
+
+ bundle_names = {x.name for x in bundles}
+ assert bundle_names == {"my-test-bundle"}
+
+
+def test_get_all_dag_bundles_default():
Review Comment:
Same with this one - should be rolled into the parametrized test above.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -51,23 +60,28 @@ def test_bundle_configs_property(envs, expected_names):
"""Test that bundle_configs are read from configuration."""
bundle_manager = DagBundlesManager()
with patch.dict(os.environ, envs):
- names = set(bundle_manager.bundle_configs.keys())
+ bundle_manager.parse_config()
Review Comment:
Should this be private? Doesn't feel like the caller should call parse
explicitly.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -112,13 +131,25 @@ def test_get_all_dag_bundles():
bundle_manager = DagBundlesManager()
- with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE":
json.dumps(BASIC_BUNDLE_CONFIG)}):
- bundles = bundle_manager.get_all_dag_bundles()
- assert len(bundles) == 2
+ with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS":
json.dumps(BASIC_BUNDLE_CONFIG)}):
+ bundles = list(bundle_manager.get_all_dag_bundles())
+ assert len(bundles) == 1
Review Comment:
Feels like we should test with more than 1.
##########
airflow/config_templates/config.yml:
##########
@@ -2676,30 +2676,38 @@ dag_bundles:
description: |
Configuration for the DAG bundles. This allows Airflow to load DAGs from
different sources.
- Airflow will consume all options added to this section. Below you will see
only the default,
- ``dags_folder``. The option name is the bundle name and the value is a
json object with the following
- keys:
-
- * classpath: The classpath of the bundle class
- * kwargs: The keyword arguments to pass to the bundle class
- * refresh_interval: The interval in seconds to refresh the bundle from its
source.
+ options:
+ backends:
+ description: |
+ List of backend configs. Must supply name, classpath, and kwargs for
each backend.
- For example, to add a new bundle named ``hello`` to my Airflow instance,
add the following to your
- airflow.cfg (this is just an example, the classpath and kwargs are not
real):
+ By default, ``refresh_interval`` is set to ``[scheduler]
dag_dir_list_interval``, but that can
+ also be overridden in kwargs if desired.
- .. code-block:: ini
+ The default is the dags folder dag bundle.
- [dag_bundles]
- hello: {classpath: "airflow.some.classpath", kwargs: {"hello":
"world"}, refresh_interval: 60}
- options:
- dags_folder:
- description: |
- This is the default DAG bundle that loads DAGs from the traditional
``[core] dags_folder``.
- By default, ``refresh_interval`` is set to ``[scheduler]
dag_dir_list_interval``, but that can be
- overridden here if desired.
- Parsing DAGs from the DAG folder can be disabled by setting this
option to an empty string.
- version_added: ~
+ Note: As shown below, you can split your json config over multiple
lines by indenting.
+ See configparser documentation for an example:
+
https://docs.python.org/3/library/configparser.html#supported-ini-file-structure.
+ version_added: 3.0.0
type: string
- example: ~
- default: '{{"classpath":
"airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle",
- "kwargs": {{}}}}'
+ example: >
+ [
+ {
+ "name": "my-git-repo",
+ "classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
+ "kwargs": {
+ "subdir": "dags",
+ "repo_url": "[email protected]:example.com/my-dags.git",
+ "tracking_ref": "main",
+ "refresh_interval": 0
+ }
+ ]
+ default: >
+ [
+ {{
+ "name": "dags-folder",
Review Comment:
Why the switch to dash?
##########
airflow/dag_processing/bundles/manager.py:
##########
@@ -30,56 +31,59 @@
from airflow.dag_processing.bundles.base import BaseDagBundle
+_bundle_config = {}
+
class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""
- @property
- def bundle_configs(self) -> dict[str, dict]:
- """Get all DAG bundle configurations."""
- configured_bundles = conf.getsection("dag_bundles")
-
- if not configured_bundles:
- return {}
-
- # If dags_folder is empty string, we remove it. This allows the
default dags_folder bundle to be disabled.
- if not configured_bundles["dags_folder"]:
- del configured_bundles["dags_folder"]
+ def parse_config(self) -> None:
+ """
+ Get all DAG bundle configurations and store in module variable.
- dict_bundles: dict[str, dict] = {}
- for key in configured_bundles.keys():
- config = conf.getjson("dag_bundles", key)
- if not isinstance(config, dict):
- raise AirflowConfigException(f"Bundle config for {key} is not
a dict: {config}")
- dict_bundles[key] = config
+ If a bundle class for a given name has already been imported, it will
not be imported again.
+ """
+ configured_bundles = conf.getjson("dag_bundles", "backends")
- return dict_bundles
+ if not configured_bundles:
+ return
+
+ if not isinstance(configured_bundles, list):
+ raise AirflowConfigException(
+ "Bundle config is not a list. Check config value"
+ " for section `dag_bundles` and key `backends`."
+ )
+ seen = set()
+ for cfg in configured_bundles:
+ name = cfg["name"]
+ if name in seen:
+ raise ValueError(f"Dag bundle {name} is configured twice.")
+ seen.add(name)
+ if name in _bundle_config:
+ continue
+ class_ = import_string(cfg["classpath"])
+ kwargs = cfg["kwargs"]
+ _bundle_config[name] = (class_, kwargs)
+
+ # remove obsolete bundle configs
Review Comment:
Not sure we should expect the config to change while a component is up. We
can probably simply short circuit out if `_bundle_config` isn't empty.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -112,13 +131,25 @@ def test_get_all_dag_bundles():
bundle_manager = DagBundlesManager()
- with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE":
json.dumps(BASIC_BUNDLE_CONFIG)}):
- bundles = bundle_manager.get_all_dag_bundles()
- assert len(bundles) == 2
+ with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS":
json.dumps(BASIC_BUNDLE_CONFIG)}):
+ bundles = list(bundle_manager.get_all_dag_bundles())
+ assert len(bundles) == 1
Review Comment:
Also feels like this is covered the parametrized test above too.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -51,23 +60,28 @@ def test_bundle_configs_property(envs, expected_names):
"""Test that bundle_configs are read from configuration."""
bundle_manager = DagBundlesManager()
with patch.dict(os.environ, envs):
- names = set(bundle_manager.bundle_configs.keys())
+ bundle_manager.parse_config()
+ names = set(x.name for x in bundle_manager.get_all_dag_bundles())
assert names == expected_names
@pytest.mark.parametrize(
"config,message",
[
- pytest.param("1", "Bundle config for testbundle is not a dict: 1",
id="int"),
- pytest.param("[]", r"Bundle config for testbundle is not a dict:
\[\]", id="list"),
- pytest.param("abc", r"Unable to parse .* as valid json",
id="not_json"),
+ pytest.param("1", "Bundle config is not a list", id="int"),
+ pytest.param("[]", None, id="list"),
+ pytest.param("{}", None, id="dict"),
Review Comment:
These cases should just move to the other parametrized test.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -51,23 +60,28 @@ def test_bundle_configs_property(envs, expected_names):
"""Test that bundle_configs are read from configuration."""
Review Comment:
(Commenting here as it's as close as I can get)
Test should be remamed.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -35,14 +35,23 @@
@pytest.mark.parametrize(
"envs,expected_names",
[
- pytest.param({}, {"dags_folder"}, id="no_config"),
+ pytest.param({}, {"dags-folder"}, id="default"),
pytest.param(
- {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, {"testbundle",
"dags_folder"}, id="add_bundle"
Review Comment:
You have no cases that have a non`dags-folder` bundle any longer.
##########
tests/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -139,15 +170,21 @@ def _get_bundle_names_and_active():
)
# Initial add
- with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE":
json.dumps(BASIC_BUNDLE_CONFIG)}):
+ with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS":
json.dumps(BASIC_BUNDLE_CONFIG)}):
bundle_manager.sync_bundles_to_db()
- assert _get_bundle_names_and_active() == [("dags_folder", True),
("testbundle", True)]
+ assert _get_bundle_names_and_active() == [("my-test-bundle", True)]
# Disable ones that disappear from config
bundle_manager.sync_bundles_to_db()
- assert _get_bundle_names_and_active() == [("dags_folder", True),
("testbundle", False)]
+ assert _get_bundle_names_and_active() == [("dags-folder", True),
("my-test-bundle", False)]
# Re-enable one that reappears in config
- with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE":
json.dumps(BASIC_BUNDLE_CONFIG)}):
+ with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS":
json.dumps(BASIC_BUNDLE_CONFIG)}):
bundle_manager.sync_bundles_to_db()
- assert _get_bundle_names_and_active() == [("dags_folder", True),
("testbundle", True)]
+ assert _get_bundle_names_and_active() == [("dags-folder", False),
("my-test-bundle", True)]
+
+
+# import yaml
Review Comment:
Leftovers :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]