ferruzzi commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2914876060
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/utils.py:
##########
@@ -66,5 +65,5 @@ class AllLambdaConfigKeys(InvokeLambdaKwargsConfigKeys):
END_WAIT_TIMEOUT = "end_wait_timeout"
-CommandType = Sequence[str]
+CommandType = Sequence[Any]
Review Comment:
Can we narrow this down any?
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/utils.py:
##########
@@ -37,9 +36,9 @@
@dataclass
class LambdaQueuedTask:
- """Represents a Lambda task that is queued. The task will be run in the
next heartbeat."""
+ """Represents a Lambda workload that is queued. The task will be run in
the next heartbeat."""
- key: TaskInstanceKey
+ key: TaskInstanceKey | str
Review Comment:
Please use `WorkloadKey` here from
`airflow-core/src/airflow/executors/workloads/types.py`
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -461,6 +517,8 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
"""
Adopt task instances which have an external_executor_id (the
serialized task key).
+ The external_executor_id may contain either a serialized
TaskInstanceKey or a callback identifier string.
Review Comment:
WorkloadKey?
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -71,7 +76,10 @@ class AwsLambdaExecutor(BaseExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pending_tasks: deque = deque()
- self.running_tasks: dict[str, TaskInstanceKey] = {}
+ self.running_tasks: dict[str, TaskInstanceKey | str] = {}
Review Comment:
Same as below, `TaskInstanceKey | str` should be
`executors.workloads.types.WorkloadKey` instead
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -179,9 +187,9 @@ def load_connections(self, check_connection: bool = True):
def sync(self):
"""
- Sync the executor with the current state of tasks.
+ Sync the executor with the current state of workloads.
- Check in on currently running tasks and attempt to run any new tasks
that have been queued.
+ Check in on currently running workloads and attempt to run any new
workloads that have been queued.
Review Comment:
Here and elsewhere: As Niko pointed out in one of my recent PRs, the user
may not be familiar with the term `workload` yet so it would be beneficial to
expand it a little so we can teach them. Consider:
```suggestion
Check in on currently running tasks and callbacks and attempt to run
any new workloads that have been queued.
```
That way we're kind of teaching them what a `workload` is without making
them go look it up.
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
self.log.exception("An error occurred while syncing tasks")
def queue_workload(self, workload: workloads.All, session: Session | None)
-> None:
- from airflow.executors import workloads
- if not isinstance(workload, workloads.ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
+ if isinstance(workload, workloads.ExecuteTask):
+ ti = workload.ti
+ self.queued_tasks[ti.key] = workload
+ return
+
+ if AIRFLOW_V_3_2_PLUS and isinstance(workload,
workloads.ExecuteCallback):
+ self.queued_callbacks[workload.callback.id] = workload
+ return
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
+
+ def _process_workloads(self, workload_items: Sequence[workloads.All]) ->
None:
+
+ for w in workload_items:
+ key: TaskInstanceKey | str
+ command: list[workloads.All]
+ queue: str | None
+ if isinstance(w, workloads.ExecuteTask):
+ command = [w]
+ key = w.ti.key
+ queue = w.ti.queue
+ executor_config = w.ti.executor_config or {}
+
+ del self.queued_tasks[key]
- def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
- from airflow.executors.workloads import ExecuteTask
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ executor_config=executor_config,
+ )
+
+ self.running.add(key)
+ continue
- for w in workloads:
- if not isinstance(w, ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of
type {type(w)}")
+ if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+ command = [w]
+ key = w.callback.id
+ queue = None
- command = [w]
- key = w.ti.key
- queue = w.ti.queue
- executor_config = w.ti.executor_config or {}
+ if isinstance(w.callback.data, dict) and "queue" in
w.callback.data:
+ queue = w.callback.data["queue"]
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config) # type: ignore[arg-type]
- self.running.add(key)
+ del self.queued_callbacks[key]
- def execute_async(self, key: TaskInstanceKey, command: CommandType,
queue=None, executor_config=None):
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ )
+
+ self.running.add(key)
+ continue
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(w)}")
+
+ def execute_async(
+ self,
+ key: TaskInstanceKey | str,
+ command: CommandType | Sequence[workloads.All],
+ queue=None,
+ executor_config=None,
+ ):
"""
- Save the task to be executed in the next sync by inserting the
commands into a queue.
+ Save the workload to be executed in the next sync by inserting the
commands into a queue.
- :param key: A unique task key (typically a tuple identifying the task
instance).
+ :param key: Unique workload key. Task workloads use TaskInstanceKey,
callback workloads use a string id.
:param command: The shell command string to execute.
:param executor_config: (Unused) to keep the same signature as the
base.
:param queue: (Unused) to keep the same signature as the base.
"""
if len(command) == 1:
- from airflow.executors.workloads import ExecuteTask
-
- if isinstance(command[0], ExecuteTask):
- workload = command[0]
- ser_input = workload.model_dump_json()
- command = [
- "python",
- "-m",
- "airflow.sdk.execution_time.execute_workload",
- "--json-string",
- ser_input,
- ]
+ if AIRFLOW_V_3_2_PLUS:
+ if not isinstance(command[0], (workloads.ExecuteTask,
workloads.ExecuteCallback)):
+ raise RuntimeError(f"{type(self)} cannot handle workloads
of type {type(command[0])}")
else:
- raise RuntimeError(
- f"LambdaExecutor doesn't know how to handle workload of
type: {type(command[0])}"
- )
+ if not isinstance(command[0], workloads.ExecuteTask):
+ raise RuntimeError(f"{type(self)} cannot handle workloads
of type {type(command[0])}")
+
+ workload = command[0]
Review Comment:
Consider moving this `workload = command[0]` to the top of the method and
using `workload` throughout the method, it would clean things up a bit.
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
self.log.exception("An error occurred while syncing tasks")
def queue_workload(self, workload: workloads.All, session: Session | None)
-> None:
- from airflow.executors import workloads
- if not isinstance(workload, workloads.ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
+ if isinstance(workload, workloads.ExecuteTask):
+ ti = workload.ti
+ self.queued_tasks[ti.key] = workload
+ return
+
+ if AIRFLOW_V_3_2_PLUS and isinstance(workload,
workloads.ExecuteCallback):
+ self.queued_callbacks[workload.callback.id] = workload
+ return
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
+
+ def _process_workloads(self, workload_items: Sequence[workloads.All]) ->
None:
+
+ for w in workload_items:
+ key: TaskInstanceKey | str
+ command: list[workloads.All]
+ queue: str | None
+ if isinstance(w, workloads.ExecuteTask):
+ command = [w]
+ key = w.ti.key
+ queue = w.ti.queue
+ executor_config = w.ti.executor_config or {}
+
+ del self.queued_tasks[key]
- def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
- from airflow.executors.workloads import ExecuteTask
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ executor_config=executor_config,
+ )
+
+ self.running.add(key)
+ continue
- for w in workloads:
- if not isinstance(w, ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of
type {type(w)}")
+ if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+ command = [w]
+ key = w.callback.id
+ queue = None
- command = [w]
- key = w.ti.key
- queue = w.ti.queue
- executor_config = w.ti.executor_config or {}
+ if isinstance(w.callback.data, dict) and "queue" in
w.callback.data:
+ queue = w.callback.data["queue"]
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config) # type: ignore[arg-type]
- self.running.add(key)
+ del self.queued_callbacks[key]
- def execute_async(self, key: TaskInstanceKey, command: CommandType,
queue=None, executor_config=None):
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ )
+
+ self.running.add(key)
+ continue
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(w)}")
+
+ def execute_async(
+ self,
+ key: TaskInstanceKey | str,
+ command: CommandType | Sequence[workloads.All],
+ queue=None,
+ executor_config=None,
+ ):
"""
- Save the task to be executed in the next sync by inserting the
commands into a queue.
+ Save the workload to be executed in the next sync by inserting the
commands into a queue.
- :param key: A unique task key (typically a tuple identifying the task
instance).
+ :param key: Unique workload key. Task workloads use TaskInstanceKey,
callback workloads use a string id.
:param command: The shell command string to execute.
:param executor_config: (Unused) to keep the same signature as the
base.
:param queue: (Unused) to keep the same signature as the base.
"""
if len(command) == 1:
- from airflow.executors.workloads import ExecuteTask
-
- if isinstance(command[0], ExecuteTask):
- workload = command[0]
- ser_input = workload.model_dump_json()
- command = [
- "python",
- "-m",
- "airflow.sdk.execution_time.execute_workload",
- "--json-string",
- ser_input,
- ]
+ if AIRFLOW_V_3_2_PLUS:
+ if not isinstance(command[0], (workloads.ExecuteTask,
workloads.ExecuteCallback)):
Review Comment:
can this be `if not isinstance(command[0], SchedulerWorkload)`, using the
SchedulerWorkload from executors/workloads/types.py? If not, then leave this
for now, @anishgirianish has a really cool PR in he works to simplify this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]