jedcunningham commented on code in PR #55973:
URL: https://github.com/apache/airflow/pull/55973#discussion_r2398880513


##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -83,6 +83,102 @@ def test_validate_executor_field():
         _validate_executor_fields(dag)
 
 
[email protected]
+def mock_dag_bundle_manager():
+    """Fixture to create a mock DAG bundle manager with team configuration."""
+    from airflow.dag_processing.bundles.base import BaseDagBundle
+    from airflow.dag_processing.bundles.manager import _InternalBundleConfig
+
+    class MockDagBundle(BaseDagBundle):
+        @property
+        def path(self):
+            return None
+
+        def get_current_version(self):
+            return "1.0.0"
+
+        def refresh(self):
+            pass
+
+    @contextlib.contextmanager
+    def _create_bundle_manager(bundle_name="test_bundle", 
team_name="test_team"):
+        mock_bundle_config = _InternalBundleConfig(bundle_class=MockDagBundle, 
kwargs={}, team_name=team_name)
+
+        bundle_config = {bundle_name: mock_bundle_config}
+
+        with patch("airflow.dag_processing.bundles.manager.DagBundlesManager") 
as mock_manager_class:
+            mock_manager = mock_manager_class.return_value
+            mock_manager._bundle_config = bundle_config
+            yield mock_manager
+
+    return _create_bundle_manager
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_with_team_restriction(mock_executor_lookup, 
mock_dag_bundle_manager):
+    """Test executor validation with team-based restrictions using bundle 
configuration."""
+    with mock_dag_bundle_manager():
+        mock_executor_lookup.side_effect = UnknownExecutorException("Executor 
not available for team")
+
+        with DAG("test-dag", schedule=None) as dag:
+            # Simulate DAG having bundle_name set during parsing
+            dag.bundle_name = "test_bundle"

Review Comment:
   Is this actually set on dag during parsing though? I didn't think it was.



##########
airflow-core/src/airflow/dag_processing/dagbag.py:
##########
@@ -135,12 +135,44 @@ def handle_timeout(signum, frame):
 
 
 def _validate_executor_fields(dag: DAG) -> None:
+    """Validate that executors specified in tasks are available and owned by 
the same team as the dag bundle."""
+    import logging
+
+    log = logging.getLogger(__name__)
+    dag_team_name = None
+
+    # Get team name from bundle configuration if available
+    if hasattr(dag, "bundle_name") and dag.bundle_name:
+        try:
+            from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+
+            bundle_manager = DagBundlesManager()
+            bundle_config = bundle_manager._bundle_config[dag.bundle_name]
+            # TODO[multi-team] Raise exceptions below instead of logging once 
we have a multi-team feature flag configuration
+            dag_team_name = bundle_config.team_name
+            log.debug(

Review Comment:
   Should we only log if we actual found a team name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to