ephraimbuddy commented on code in PR #44924:
URL: https://github.com/apache/airflow/pull/44924#discussion_r1885002162
##########
airflow/models/dag.py:
##########
@@ -2028,7 +2027,7 @@ class DagModel(Base):
fileloc = Column(String(2000))
# The base directory used by Dag Processor that parsed this dag.
processor_subdir = Column(String(2000), nullable=True)
- bundle_id = Column(UUIDType(binary=False), ForeignKey("dag_bundle.id"),
nullable=True)
+ bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"),
nullable=True)
Review Comment:
I think to preserve history, we should use an association table so that when
a dag object is assigned a new bundle object, the history will be preserved.
Example: If a dag 'A' is in dag-bundle 'DA', and 'DA' is no longer configured
or the name was changed, which triggers a new dag-bundle object, say 'DB',
which now has dag 'A' in it. The DAG bundle_name will update to the new
dagbundle object 'DB', causing us to lose the previous bundle name. With an
association table, we can have an is_active in the table that tells whether the
bundle has been removed. However, there will be more complex queries.
Another thing I thought of is using a history table like in TIH, but DAG
changes more often.
##########
airflow/dag_processing/bundles/manager.py:
##########
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models.dagbundle import DagBundleModel
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.module_loading import import_string
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+ from airflow.dag_processing.bundles.base import BaseDagBundle
+
+
+class DagBundlesManager(LoggingMixin):
+ """Manager for DAG bundles."""
+
+ @property
+ def bundle_configs(self) -> dict[str, dict]:
+ """Get all DAG bundle configurations."""
+ configured_bundles = conf.getsection("dag_bundles")
+
+ if not configured_bundles:
+ return {}
+
+ # If dags_folder is empty string, we remove it. This allows the
default dags_folder bundle to be disabled.
+ if not configured_bundles["dags_folder"]:
+ del configured_bundles["dags_folder"]
+
+ dict_bundles: dict[str, dict] = {}
+ for key in configured_bundles.keys():
+ config = conf.getjson("dag_bundles", key)
Review Comment:
How do we handle a case with duplicate named bundles in the configuration?
Should we throw an error or log that there's a duplicate and we are using the
last
##########
airflow/config_templates/config.yml:
##########
@@ -2654,3 +2654,34 @@ usage_data_collection:
example: ~
default: "True"
see_also: ":ref:`Usage data collection FAQ <usage-data-collection>`"
+dag_bundles:
+ description: |
+ Configuration for the DAG bundles. This allows Airflow to load DAGs from
different sources.
+
+ Airflow will consume all options added to this section. Below you will see
only the default,
+ ``dags_folder``. The option name is the bundle name and the value is a
json object with the following
+ keys:
+
+ * classpath: The classpath of the bundle class
+ * kwargs: The keyword arguments to pass to the bundle class
+ * refresh_interval: The interval in seconds to refresh the bundle from its
source.
+
+ For example, to add a new bundle named ``hello`` to my Airflow instance,
add the following to your
+ airflow.cfg (this is just an example, the classpath and kwargs are not
real):
+
+ .. code-block:: ini
+
+ [dag_bundles]
+ hello: {classpath: "airflow.some.classpath", kwargs: {"hello":
"world"}, refresh_interval: 60}
+ options:
+ dags_folder:
+ description: |
+ This is the default DAG bundle that loads DAGs from the traditional
``[core] dags_folder``.
+ By default, ``refresh_interval`` it set to ``[scheduler]
dag_dir_list_interval``, but that can be
Review Comment:
```suggestion
By default, ``refresh_interval`` is set to ``[scheduler]
dag_dir_list_interval``, but that can be
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]