ferruzzi commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2928008788
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -184,8 +185,7 @@ def __init__(self, parallelism: int = PARALLELISM,
team_name: str | None = None)
self.parallelism: int = parallelism
self.team_name: str | None = team_name
- self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
- self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+ self.executor_queues: dict[str, dict] = defaultdict(dict)
Review Comment:
Can you expand that second dict? I think it's
```suggestion
self.executor_queues: dict[str, dict[WorkloadKey, workloads.All] =
defaultdict(dict)
```
##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -28,6 +29,23 @@
from airflow.api_fastapi.auth.tokens import JWTGenerator
+class WorkloadType(StrEnum):
+ """Central registry of all workload types."""
+
+ EXECUTE_TASK = "ExecuteTask"
+ EXECUTE_CALLBACK = "ExecuteCallback"
+ RUN_TRIGGER = "RunTrigger"
+
+
+# Central priority registry: Tuple is ordered from highest priority to lowest.
Review Comment:
These are only for executor workloads, right? May want to add that to the
comment or the name. Who knows that Workloads someone might add in the future,
maybe an async workload sent tot eh triggerer or something,.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -220,47 +235,29 @@ def log_task_event(self, *, event: str, extra: str,
ti_key: TaskInstanceKey):
self._task_event_logs.append(Log(event=event, task_instance=ti_key,
extra=extra))
def queue_workload(self, workload: workloads.All, session: Session) ->
None:
- if isinstance(workload, workloads.ExecuteTask):
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
- elif isinstance(workload, workloads.ExecuteCallback):
- if not self.supports_callbacks:
- raise NotImplementedError(
- f"{type(self).__name__} does not support ExecuteCallback
workloads. "
- f"Set supports_callbacks = True and implement callback
handling in _process_workloads(). "
- f"See LocalExecutor or CeleryExecutor for reference
implementation."
- )
- self.queued_callbacks[workload.callback.id] = workload
- else:
- raise ValueError(
- f"Un-handled workload type {type(workload).__name__!r} in
{type(self).__name__}. "
- f"Workload must be one of: ExecuteTask, ExecuteCallback."
+ if workload.type not in self.supported_workload_types:
+ raise NotImplementedError(
+ f"{type(self).__name__} does not support {workload.type!r}
workloads. "
+ f"Add {workload.type!r} to supported_workload_types and
implement handling "
+ f"in _process_workloads()."
)
+ self.executor_queues[workload.type][workload.queue_key] = workload
def _get_workloads_to_schedule(self, open_slots: int) ->
list[tuple[WorkloadKey, workloads.All]]:
Review Comment:
😍 So much cleaner.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -528,27 +510,24 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
return tis
Review Comment:
Do we not need `slots_available` anymore? That seems odd
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -212,6 +212,21 @@ def __repr__(self):
_repr += ")"
return _repr
+ @property
Review Comment:
This should cover @jscheffl's concern from the other PR. I think it should
have a deprecation notice in here though so we can eventually start trimming it
out.
##########
airflow-core/tests/unit/executors/test_base_executor.py:
##########
@@ -35,6 +35,7 @@
from airflow.executors import workloads
Review Comment:
I don't see any tests that cover the backcompat cases with queued_tasks and
queued_callbacks, can you add those?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]