Copilot commented on code in PR #63185:
URL: https://github.com/apache/airflow/pull/63185#discussion_r3066493622


##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) -> 
tuple[str | None, dict]:
             
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == 
name))
             self.log.info("Deleted import errors for bundle %s which is no 
longer configured", name)
 
+        session.flush()
+
+    @provide_session
+    def reassign_dags_with_unconfigured_bundles(self, *, session: Session = 
NEW_SESSION) -> int:
+        """
+        Reassign Dags that reference unconfigured bundles (None or incorrect) 
to the first configured bundle as a fallback.
+
+        This addresses Dags that reference bundles incorrectly (i.e. the Dag's 
`bundle_name` matches a value in the database, but that database value does not 
correspond to a user-configured bundle) as a side effect of the
+        `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see: 
https://github.com/apache/airflow/issues/63323).
+
+        Instead of attempting to infer the correct bundle for each Dag during 
the migration, we reassign all Dags with unconfigured bundles to the first 
configured bundle at DagFileProcessorManager startup. This relaxes the
+        "Requested bundle '{name}' is not configured."
+        error that would otherwise occur when triggering a DagRun immediately 
after the migration.
+
+        This fallback is not always semantically correct in environments using 
multiple bundles, but it is a safe, temporary measure that allows users to 
successfully trigger DagRuns right after the migration.
+
+        The correct Dag-to-bundle assignments will be restored by the Dag 
processor on the next parsing cycle.
+
+        :param session: ORM Session
+        :return: Number of Dags reassigned.
+        """
+        from airflow.models.dag import DagModel
+        # lazy import to avoid circular import issues
+
+        configured_names = self.bundle_names
+        if not configured_names:
+            # This should not happen because we already have validation at 
parse_config in constructor.
+            raise AirflowConfigException(
+                "No Dag bundles are currently configured. Cannot reassign Dags 
with unconfigured bundles to a valid bundle. Please add at least one bundle 
configuration to your config."
+            )

Review Comment:
   `reassign_dags_with_unconfigured_bundles()` currently raises when 
`bundle_names` is empty, but `DagBundlesManager.parse_config()` explicitly 
allows an empty `dag_bundle_config_list` (see tests covering `"[]"`). Since 
`DagFileProcessorManager.sync_bundles()` now calls this unconditionally at 
startup, an intentionally empty bundle config will crash the Dag processor. 
Consider making this a no-op (return 0 + log) when no bundles are configured, 
or enforce non-empty bundles at config-parse time consistently across the 
codebase.
   ```suggestion
               self.log.info(
                   "No Dag bundles are configured; skipping reassignment of 
Dags with unconfigured bundles."
               )
               return 0
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) -> 
tuple[str | None, dict]:
             
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == 
name))
             self.log.info("Deleted import errors for bundle %s which is no 
longer configured", name)
 
+        session.flush()
+
+    @provide_session
+    def reassign_dags_with_unconfigured_bundles(self, *, session: Session = 
NEW_SESSION) -> int:
+        """
+        Reassign Dags that reference unconfigured bundles (None or incorrect) 
to the first configured bundle as a fallback.
+
+        This addresses Dags that reference bundles incorrectly (i.e. the Dag's 
`bundle_name` matches a value in the database, but that database value does not 
correspond to a user-configured bundle) as a side effect of the
+        `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see: 
https://github.com/apache/airflow/issues/63323).
+
+        Instead of attempting to infer the correct bundle for each Dag during 
the migration, we reassign all Dags with unconfigured bundles to the first 
configured bundle at DagFileProcessorManager startup. This relaxes the
+        "Requested bundle '{name}' is not configured."
+        error that would otherwise occur when triggering a DagRun immediately 
after the migration.
+
+        This fallback is not always semantically correct in environments using 
multiple bundles, but it is a safe, temporary measure that allows users to 
successfully trigger DagRuns right after the migration.
+
+        The correct Dag-to-bundle assignments will be restored by the Dag 
processor on the next parsing cycle.
+
+        :param session: ORM Session
+        :return: Number of Dags reassigned.
+        """
+        from airflow.models.dag import DagModel
+        # lazy import to avoid circular import issues
+

Review Comment:
   This `DagModel` import is inside the method body, but unlike 
`ParseImportError` in `sync_bundles_to_db`, it doesn't appear to be required to 
avoid a circular import (`airflow.models.dag` does not import 
`DagBundlesManager`). Please move this import to module scope to match 
Airflow’s import conventions and avoid repeated imports on every call.



##########
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -449,3 +457,161 @@ def test_multiple_bundles_one_fails(clear_db, session):
 
 def test_get_all_bundle_names():
     assert DagBundlesManager().get_all_bundle_names() == ["dags-folder", 
"example_dags"]
+
+
[email protected]
+def clear_dags_and_bundles():
+    clear_db_dags()
+    clear_db_dag_bundles()
+    yield
+    clear_db_dags()
+    clear_db_dag_bundles()
+
+
+def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel:
+    dag = DagModel(dag_id=dag_id, bundle_name=bundle_name, 
fileloc=f"/tmp/{dag_id}.py")
+    session.add(dag)
+    session.flush()
+    return dag
+
+
[email protected]_test
+class TestReassignDagsWithUnconfiguredBundles:
+    """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles."""
+
+    def _manager_with_bundle_names(self, names: list[str]) -> 
DagBundlesManager:
+        """Return a DagBundlesManager whose ``bundle_names`` property returns 
*names*.
+
+        :param names: Bundle names to expose via the ``bundle_names`` property.
+        :return: A patched ``DagBundlesManager`` instance.
+        """
+        with patch.dict(
+            os.environ,
+            {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": 
json.dumps(BASIC_BUNDLE_CONFIG)},
+        ):
+            manager = DagBundlesManager()
+        # Override bundle_names so the method uses the names we want without
+        # requiring a full bundle config for each name variant.
+        manager.__class__ = type(
+            "PatchedManager", (DagBundlesManager,), {"bundle_names": 
property(lambda self: names)}
+        )
+        return manager

Review Comment:
   Reassigning `manager.__class__` at runtime to override `bundle_names` is a 
brittle pattern and makes the test harder to understand. Prefer patching the 
property via `monkeypatch`/`patch.object` (or injecting configured names via 
env/config) so the instance type remains stable.



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) -> 
tuple[str | None, dict]:
             
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == 
name))
             self.log.info("Deleted import errors for bundle %s which is no 
longer configured", name)
 
+        session.flush()
+
+    @provide_session
+    def reassign_dags_with_unconfigured_bundles(self, *, session: Session = 
NEW_SESSION) -> int:
+        """
+        Reassign Dags that reference unconfigured bundles (None or incorrect) 
to the first configured bundle as a fallback.
+
+        This addresses Dags that reference bundles incorrectly (i.e. the Dag's 
`bundle_name` matches a value in the database, but that database value does not 
correspond to a user-configured bundle) as a side effect of the
+        `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see: 
https://github.com/apache/airflow/issues/63323).
+
+        Instead of attempting to infer the correct bundle for each Dag during 
the migration, we reassign all Dags with unconfigured bundles to the first 
configured bundle at DagFileProcessorManager startup. This relaxes the
+        "Requested bundle '{name}' is not configured."
+        error that would otherwise occur when triggering a DagRun immediately 
after the migration.
+
+        This fallback is not always semantically correct in environments using 
multiple bundles, but it is a safe, temporary measure that allows users to 
successfully trigger DagRuns right after the migration.
+
+        The correct Dag-to-bundle assignments will be restored by the Dag 
processor on the next parsing cycle.

Review Comment:
   Several lines in this new docstring exceed the repo’s configured 
110-character line length (ruff/black). Please wrap the long lines (especially 
the migration reference + issue link paragraph) so CI doesn’t fail with E501.
   ```suggestion
           Reassign Dags that reference unconfigured bundles (None or 
incorrect) to the first
           configured bundle as a fallback.
   
           This addresses Dags that reference bundles incorrectly (i.e. the 
Dag's `bundle_name`
           matches a value in the database, but that database value does not 
correspond to a
           user-configured bundle) as a side effect of the
           `0082_3_1_0_make_bundle_name_not_nullable.py` migration
           (see: https://github.com/apache/airflow/issues/63323).
   
           Instead of attempting to infer the correct bundle for each Dag 
during the migration, we
           reassign all Dags with unconfigured bundles to the first configured 
bundle at
           DagFileProcessorManager startup. This relaxes the
           "Requested bundle '{name}' is not configured."
           error that would otherwise occur when triggering a DagRun 
immediately after the migration.
   
           This fallback is not always semantically correct in environments 
using multiple bundles,
           but it is a safe, temporary measure that allows users to 
successfully trigger DagRuns
           right after the migration.
   
           The correct Dag-to-bundle assignments will be restored by the Dag 
processor on the next
           parsing cycle.
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) -> 
tuple[str | None, dict]:
             
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == 
name))
             self.log.info("Deleted import errors for bundle %s which is no 
longer configured", name)
 
+        session.flush()
+

Review Comment:
   `session.flush()` here looks redundant: `session.execute(delete(...))` has 
already hit the DB, and `@provide_session` will flush/commit on exit when it 
owns the session. If this is only meant to support the subsequent startup 
reassignment, it shouldn’t be necessary; consider removing it (or add a short 
comment explaining why an explicit flush is required).
   ```suggestion
   
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) -> 
tuple[str | None, dict]:
             
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == 
name))
             self.log.info("Deleted import errors for bundle %s which is no 
longer configured", name)
 
+        session.flush()
+
+    @provide_session
+    def reassign_dags_with_unconfigured_bundles(self, *, session: Session = 
NEW_SESSION) -> int:
+        """
+        Reassign Dags that reference unconfigured bundles (None or incorrect) 
to the first configured bundle as a fallback.
+
+        This addresses Dags that reference bundles incorrectly (i.e. the Dag's 
`bundle_name` matches a value in the database, but that database value does not 
correspond to a user-configured bundle) as a side effect of the
+        `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see: 
https://github.com/apache/airflow/issues/63323).
+
+        Instead of attempting to infer the correct bundle for each Dag during 
the migration, we reassign all Dags with unconfigured bundles to the first 
configured bundle at DagFileProcessorManager startup. This relaxes the
+        "Requested bundle '{name}' is not configured."
+        error that would otherwise occur when triggering a DagRun immediately 
after the migration.
+
+        This fallback is not always semantically correct in environments using 
multiple bundles, but it is a safe, temporary measure that allows users to 
successfully trigger DagRuns right after the migration.
+
+        The correct Dag-to-bundle assignments will be restored by the Dag 
processor on the next parsing cycle.
+
+        :param session: ORM Session
+        :return: Number of Dags reassigned.
+        """
+        from airflow.models.dag import DagModel
+        # lazy import to avoid circular import issues
+
+        configured_names = self.bundle_names
+        if not configured_names:
+            # This should not happen because we already have validation at 
parse_config in constructor.
+            raise AirflowConfigException(
+                "No Dag bundles are currently configured. Cannot reassign Dags 
with unconfigured bundles to a valid bundle. Please add at least one bundle 
configuration to your config."
+            )
+        default_bundle = configured_names[0]
+
+        count = cast(
+            "CursorResult",
+            session.execute(
+                update(DagModel)
+                .where(
+                    or_(
+                        DagModel.bundle_name.notin_(configured_names),
+                        DagModel.bundle_name.is_(None),
+                    )
+                )
+                .values(bundle_name=default_bundle)
+            ),

Review Comment:
   For consistency with other bulk updates on `DagModel` in the codebase (e.g. 
`airflow/dag_processing/collection.py` and core API routes), add 
`.execution_options(synchronize_session="fetch")` to this `update(DagModel)` 
statement so callers that pass an existing session don’t end up with stale 
in-memory `DagModel` objects.



##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -1249,6 +1249,18 @@ def test_add_callback_skips_when_bundle_init_fails(self, 
mock_bundle_manager):
         bundle.initialize.assert_called_once()
         assert len(manager._callback_to_execute) == 0
 
+    @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+    def test_reassign_called_once_at_startup_not_on_refresh(self, 
mock_bundle_manager):
+        """reassign_dags_with_unconfigured_bundles is called exactly once by 
sync_bundles, not by _refresh_dag_bundles."""

Review Comment:
   This new test docstring line is likely over the repo’s 110-character line 
length limit (ruff E501). Please wrap it across multiple lines to avoid lint 
failures.
   ```suggestion
           """
           reassign_dags_with_unconfigured_bundles is called exactly once by 
sync_bundles,
           not by _refresh_dag_bundles.
           """
   ```



##########
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -449,3 +457,161 @@ def test_multiple_bundles_one_fails(clear_db, session):
 
 def test_get_all_bundle_names():
     assert DagBundlesManager().get_all_bundle_names() == ["dags-folder", 
"example_dags"]
+
+
[email protected]
+def clear_dags_and_bundles():
+    clear_db_dags()
+    clear_db_dag_bundles()
+    yield
+    clear_db_dags()
+    clear_db_dag_bundles()
+
+
+def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel:
+    dag = DagModel(dag_id=dag_id, bundle_name=bundle_name, 
fileloc=f"/tmp/{dag_id}.py")
+    session.add(dag)
+    session.flush()
+    return dag
+
+
[email protected]_test
+class TestReassignDagsWithUnconfiguredBundles:
+    """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles."""
+
+    def _manager_with_bundle_names(self, names: list[str]) -> 
DagBundlesManager:
+        """Return a DagBundlesManager whose ``bundle_names`` property returns 
*names*.
+
+        :param names: Bundle names to expose via the ``bundle_names`` property.
+        :return: A patched ``DagBundlesManager`` instance.
+        """
+        with patch.dict(
+            os.environ,
+            {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": 
json.dumps(BASIC_BUNDLE_CONFIG)},
+        ):
+            manager = DagBundlesManager()
+        # Override bundle_names so the method uses the names we want without
+        # requiring a full bundle config for each name variant.
+        manager.__class__ = type(
+            "PatchedManager", (DagBundlesManager,), {"bundle_names": 
property(lambda self: names)}
+        )
+        return manager
+
+    def test_no_configured_bundles_raises(self, clear_dags_and_bundles, 
session):
+        """Raise AirflowConfigException when no bundles are configured."""
+        manager = self._manager_with_bundle_names([])
+        with pytest.raises(AirflowConfigException, match="No Dag bundles are 
currently configured"):
+            manager.reassign_dags_with_unconfigured_bundles(session=session)

Review Comment:
   This test asserts that `reassign_dags_with_unconfigured_bundles()` should 
raise when no bundles are configured, but `DagBundlesManager` currently permits 
an explicitly empty `dag_bundle_config_list` (and `DagFileProcessorManager` can 
run with no bundles). Since `sync_bundles()` now calls reassignment 
unconditionally at startup, raising here will crash the Dag processor in that 
configuration. Consider changing the behavior (and this test) to a no-op that 
returns 0 when no bundles are configured.
   ```suggestion
       def test_no_configured_bundles_is_noop(self, clear_dags_and_bundles, 
session):
           """Return 0 without raising when no bundles are configured."""
           manager = self._manager_with_bundle_names([])
           assert 
manager.reassign_dags_with_unconfigured_bundles(session=session) == 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to