o-nikolas commented on code in PR #61646:
URL: https://github.com/apache/airflow/pull/61646#discussion_r2785087436


##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -50,17 +49,34 @@
     # Task tuple to send to be executed
     TaskTuple = tuple[TaskInstanceKey, CommandType, str | None, Any | None]
 
-PARALLELISM: int = conf.getint("core", "PARALLELISM")
-DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
-
 
 class EdgeExecutor(BaseExecutor):
     """Implementation of the EdgeExecutor to distribute work to Edge Workers 
via HTTP."""
 
-    def __init__(self, parallelism: int = PARALLELISM):
-        super().__init__(parallelism=parallelism)
+    supports_multi_team: bool = True
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
         self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
 
+        # Check if self has the ExecutorConf set on the self.conf attribute, 
and if not, set it to the global
+        # configuration object. This allows the changes to be backwards 
compatible with older versions of
+        # Airflow.
+        # Can be removed when minimum supported provider version is equal to 
the version of core airflow
+        # which introduces multi-team configuration.
+        if not hasattr(self, "conf"):
+            from airflow.configuration import conf as global_conf
+
+            self.conf = global_conf
+
+        # Track queues managed by this executor instance for multi-team 
isolation.
+        # In a multi-team setup, each executor should only manage jobs and 
workers
+        # associated with its own queues, not those of other teams.
+        # Initialize with the default queue from (possibly team-specific) 
config.
+        self._managed_queues: set[str] = set()
+        default_queue = self.conf.get_mandatory_value("operators", 
"default_queue")
+        self._managed_queues.add(default_queue)

Review Comment:
   Re the queue discussion from the other thread on this PR, we shouldn't use 
queues this way, full stop.
   
   But just commenting to reply to @jscheffl: 
   
   > I am not sure whether it makes sense to start multiple instances of the 
same code just to have each instance only managing a few queues. 
   
   Indeed, each executor instance owns/is associated to one team only. This 
makes the changes very transparent since the logic inside the executor does not 
need to change (otherwise there would be a lot of branching and task management 
logic to keep teams separate). Also it allows the executor code to be very 
nicely backwards compatible (without even more branching logic for pre and post 
multi-team support). And lastly it is a nice boundary to keep, the only place 
tasks mix is inside the scheduler logic (which is shared of course) and once it 
enters the executor instance it is team all the way to task completion.
   
   We could re-evaluate this in later releases if you still feel strongly 
otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to