kaxil commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2963375864


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -92,18 +92,13 @@ class CeleryExecutor(BaseExecutor):
     """
 
     supports_ad_hoc_ti_run: bool = True
-    supports_callbacks: bool = True
+    supported_workload_types: frozenset[str] = frozenset({"ExecuteTask", 
"ExecuteCallback"})

Review Comment:
   This uses raw strings `"ExecuteTask"` and `"ExecuteCallback"` instead of 
`WorkloadType.EXECUTE_TASK` and `WorkloadType.EXECUTE_CALLBACK` like 
`LocalExecutor` does. Works because `WorkloadType` is a `str` enum, but it's 
fragile if the values ever change.



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -76,6 +94,23 @@ class BaseWorkloadSchema(BaseModel):
     def generate_token(sub_id: str, generator: JWTGenerator | None = None) -> 
str:
         return generator.generate({"sub": sub_id}) if generator else ""
 
+    @property
+    def queue_key(self) -> WorkloadKey:
+        """Return a unique key used to store/lookup this workload in the 
executor queue."""
+        raise NotImplementedError

Review Comment:
   Since all concrete workloads go through `BaseDagBundleWorkload(ABC)`, could 
this be declared `@abstractmethod` on that class instead? Would catch missing 
implementations at class definition time rather than runtime.



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -63,11 +62,6 @@ class AwsLambdaExecutor(BaseExecutor):
 
     supports_multi_team: bool = True
 
-    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:

Review Comment:
   Same pattern here -- `queue_workload` (line ~207) and `_process_workloads` 
(line ~221) still reference `self.queued_tasks`.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -81,11 +80,6 @@ class KubernetesExecutor(BaseExecutor):
     supports_ad_hoc_ti_run: bool = True
     supports_multi_team: bool = True
 
-    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:

Review Comment:
   Same issue -- `queue_workload` (line ~235), `_process_workloads` (line 
~251), and `revoke_task`/`cleanup_stuck_queued_tasks` all still use 
`self.queued_tasks`.



##########
airflow-core/tests/unit/executors/test_base_executor.py:
##########
@@ -288,7 +289,6 @@ def test_debug_dump(caplog):
     executor = BaseExecutor()
     with caplog.at_level(logging.INFO):
         executor.debug_dump()
-    assert "executor.queued" in caplog.text
     assert "executor.running" in caplog.text

Review Comment:
   This assertion was removed, and with a fresh executor the `debug_dump` loop 
body never executes (empty `defaultdict`). Might be worth adding a test that 
populates the queues first and verifies the new `executor.queued[...]` format 
shows up.



##########
airflow-core/src/airflow/executors/workloads/types.py:
##########
@@ -20,6 +20,8 @@
 
 from typing import TYPE_CHECKING, TypeAlias
 
+from airflow.executors.workloads.callback import ExecuteCallback

Review Comment:
   `QueueableWorkload` is only consumed under `TYPE_CHECKING` (in 
`base_executor.py`). These imports and the alias could live under 
`TYPE_CHECKING` too, like `WorkloadKey` above.



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py:
##########
@@ -92,11 +91,6 @@ class AwsBatchExecutor(BaseExecutor):
     # AWS only allows a maximum number of JOBs in the describe_jobs function
     DESCRIBE_JOBS_BATCH_SIZE = 99
 
-    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:

Review Comment:
   The type annotation is removed, but `queue_workload` (line ~127) and 
`_process_workloads` (line ~141) still access `self.queued_tasks`, which now 
fires a `RemovedInAirflow4Warning` on every call. Should these use 
`self.executor_queues[WorkloadType.EXECUTE_TASK]` directly, or can the 
overrides be removed entirely if the base class handles it?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -95,11 +95,6 @@ class AwsEcsExecutor(BaseExecutor):
     # AWS limits the maximum number of ARNs in the describe_tasks function.
     DESCRIBE_TASKS_BATCH_SIZE = 99
 
-    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:

Review Comment:
   Same issue as the Batch executor -- `queue_workload` (line ~135) and 
`_process_workloads` (line ~150) still go through `self.queued_tasks`, 
triggering deprecation warnings in production.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to