o-nikolas commented on code in PR #61646:
URL: https://github.com/apache/airflow/pull/61646#discussion_r2785087436
##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -50,17 +49,34 @@
# Task tuple to send to be executed
TaskTuple = tuple[TaskInstanceKey, CommandType, str | None, Any | None]
-PARALLELISM: int = conf.getint("core", "PARALLELISM")
-DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
-
class EdgeExecutor(BaseExecutor):
"""Implementation of the EdgeExecutor to distribute work to Edge Workers
via HTTP."""
- def __init__(self, parallelism: int = PARALLELISM):
- super().__init__(parallelism=parallelism)
+ supports_multi_team: bool = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
+ # Check if self has the ExecutorConf set on the self.conf attribute,
and if not, set it to the global
+ # configuration object. This allows the changes to be backwards
compatible with older versions of
+ # Airflow.
+ # Can be removed when minimum supported provider version is equal to
the version of core airflow
+ # which introduces multi-team configuration.
+ if not hasattr(self, "conf"):
+ from airflow.configuration import conf as global_conf
+
+ self.conf = global_conf
+
+ # Track queues managed by this executor instance for multi-team
isolation.
+ # In a multi-team setup, each executor should only manage jobs and
workers
+ # associated with its own queues, not those of other teams.
+ # Initialize with the default queue from (possibly team-specific)
config.
+ self._managed_queues: set[str] = set()
+ default_queue = self.conf.get_mandatory_value("operators",
"default_queue")
+ self._managed_queues.add(default_queue)
Review Comment:
Re the queue discussion from the other thread on this PR, we shouldn't use
queues this way, full stop.
But just commenting to reply to @jscheffl:
> I am not sure whether it makes sense to start multiple instances of the
same code just to have each instance only managing a few queues.
Indeed, each executor instance owns/is associated to one team only. This
makes the changes very transparent since the logic inside the executor does not
need to change (otherwise there would be a lot of branching and task management
logic to keep teams separate). Also it allows the executor code to be very
nicely backwards compatible (without even more branching logic for pre and post
multi-team support). And lastly it is a nice boundary to keep, the only place
tasks mix is inside the scheduler logic (which is shared of course) and once it
enters the executor instance it is team all the way to task completion.
We could re-evaluate this in later releases if you still feel strongly
otherwise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]