ferruzzi commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2928008788


##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -184,8 +185,7 @@ def __init__(self, parallelism: int = PARALLELISM, 
team_name: str | None = None)
 
         self.parallelism: int = parallelism
         self.team_name: str | None = team_name
-        self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
-        self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+        self.executor_queues: dict[str, dict] = defaultdict(dict)

Review Comment:
   Can you expand that second dict?  I think it's 
   
   ```suggestion
           self.executor_queues: dict[str, dict[WorkloadKey, workloads.All] = 
defaultdict(dict)
   ```



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -28,6 +29,23 @@
     from airflow.api_fastapi.auth.tokens import JWTGenerator
 
 
+class WorkloadType(StrEnum):
+    """Central registry of all workload types."""
+
+    EXECUTE_TASK = "ExecuteTask"
+    EXECUTE_CALLBACK = "ExecuteCallback"
+    RUN_TRIGGER = "RunTrigger"
+
+
+# Central priority registry: Tuple is ordered from highest priority to lowest.

Review Comment:
   These are only for executor workloads, right?  May want to add that to the 
comment or the name.  Who knows that Workloads someone might add in the future, 
maybe an async workload sent tot eh triggerer or something,.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -220,47 +235,29 @@ def log_task_event(self, *, event: str, extra: str, 
ti_key: TaskInstanceKey):
         self._task_event_logs.append(Log(event=event, task_instance=ti_key, 
extra=extra))
 
     def queue_workload(self, workload: workloads.All, session: Session) -> 
None:
-        if isinstance(workload, workloads.ExecuteTask):
-            ti = workload.ti
-            self.queued_tasks[ti.key] = workload
-        elif isinstance(workload, workloads.ExecuteCallback):
-            if not self.supports_callbacks:
-                raise NotImplementedError(
-                    f"{type(self).__name__} does not support ExecuteCallback 
workloads. "
-                    f"Set supports_callbacks = True and implement callback 
handling in _process_workloads(). "
-                    f"See LocalExecutor or CeleryExecutor for reference 
implementation."
-                )
-            self.queued_callbacks[workload.callback.id] = workload
-        else:
-            raise ValueError(
-                f"Un-handled workload type {type(workload).__name__!r} in 
{type(self).__name__}. "
-                f"Workload must be one of: ExecuteTask, ExecuteCallback."
+        if workload.type not in self.supported_workload_types:
+            raise NotImplementedError(
+                f"{type(self).__name__} does not support {workload.type!r} 
workloads. "
+                f"Add {workload.type!r} to supported_workload_types and 
implement handling "
+                f"in _process_workloads()."
             )
+        self.executor_queues[workload.type][workload.queue_key] = workload
 
     def _get_workloads_to_schedule(self, open_slots: int) -> 
list[tuple[WorkloadKey, workloads.All]]:

Review Comment:
   😍   So much cleaner.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -528,27 +510,24 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         return tis
 

Review Comment:
   Do we not need `slots_available` anymore?   That seems odd



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -212,6 +212,21 @@ def __repr__(self):
         _repr += ")"
         return _repr
 
+    @property

Review Comment:
   This should cover @jscheffl's concern from the other PR.   I think it should 
have a deprecation notice in here though so we can eventually start trimming it 
out.



##########
airflow-core/tests/unit/executors/test_base_executor.py:
##########
@@ -35,6 +35,7 @@
 from airflow.executors import workloads

Review Comment:
   I don't see any tests that cover the backcompat cases with queued_tasks and 
queued_callbacks, can you add those?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to