SameerMesiah97 commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2927950734
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
self.log.exception("An error occurred while syncing tasks")
def queue_workload(self, workload: workloads.All, session: Session | None)
-> None:
- from airflow.executors import workloads
- if not isinstance(workload, workloads.ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
+ if isinstance(workload, workloads.ExecuteTask):
+ ti = workload.ti
+ self.queued_tasks[ti.key] = workload
+ return
+
+ if AIRFLOW_V_3_2_PLUS and isinstance(workload,
workloads.ExecuteCallback):
+ self.queued_callbacks[workload.callback.id] = workload
+ return
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
+
+ def _process_workloads(self, workload_items: Sequence[workloads.All]) ->
None:
+
+ for w in workload_items:
+ key: TaskInstanceKey | str
+ command: list[workloads.All]
+ queue: str | None
+ if isinstance(w, workloads.ExecuteTask):
+ command = [w]
+ key = w.ti.key
+ queue = w.ti.queue
+ executor_config = w.ti.executor_config or {}
+
+ del self.queued_tasks[key]
- def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
- from airflow.executors.workloads import ExecuteTask
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ executor_config=executor_config,
+ )
+
+ self.running.add(key)
+ continue
- for w in workloads:
- if not isinstance(w, ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of
type {type(w)}")
+ if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+ command = [w]
+ key = w.callback.id
+ queue = None
- command = [w]
- key = w.ti.key
- queue = w.ti.queue
- executor_config = w.ti.executor_config or {}
+ if isinstance(w.callback.data, dict) and "queue" in
w.callback.data:
+ queue = w.callback.data["queue"]
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config) # type: ignore[arg-type]
- self.running.add(key)
+ del self.queued_callbacks[key]
- def execute_async(self, key: TaskInstanceKey, command: CommandType,
queue=None, executor_config=None):
+ self.execute_async(
+ key=key,
+ command=command,
+ queue=queue,
+ )
+
+ self.running.add(key)
+ continue
+
+ raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(w)}")
+
+ def execute_async(
+ self,
+ key: TaskInstanceKey | str,
+ command: CommandType | Sequence[workloads.All],
+ queue=None,
+ executor_config=None,
+ ):
"""
- Save the task to be executed in the next sync by inserting the
commands into a queue.
+ Save the workload to be executed in the next sync by inserting the
commands into a queue.
- :param key: A unique task key (typically a tuple identifying the task
instance).
+ :param key: Unique workload key. Task workloads use TaskInstanceKey,
callback workloads use a string id.
:param command: The shell command string to execute.
:param executor_config: (Unused) to keep the same signature as the
base.
:param queue: (Unused) to keep the same signature as the base.
"""
if len(command) == 1:
- from airflow.executors.workloads import ExecuteTask
-
- if isinstance(command[0], ExecuteTask):
- workload = command[0]
- ser_input = workload.model_dump_json()
- command = [
- "python",
- "-m",
- "airflow.sdk.execution_time.execute_workload",
- "--json-string",
- ser_input,
- ]
+ if AIRFLOW_V_3_2_PLUS:
+ if not isinstance(command[0], (workloads.ExecuteTask,
workloads.ExecuteCallback)):
+ raise RuntimeError(f"{type(self)} cannot handle workloads
of type {type(command[0])}")
else:
- raise RuntimeError(
- f"LambdaExecutor doesn't know how to handle workload of
type: {type(command[0])}"
- )
+ if not isinstance(command[0], workloads.ExecuteTask):
+ raise RuntimeError(f"{type(self)} cannot handle workloads
of type {type(command[0])}")
+
+ workload = command[0]
Review Comment:
`workload = command[0]` has been moved just below `if len(command) == 1
`otherwise it could throw an `IndexError` if `command` is an empty list. Not
at the top of the method but ithe function should look cleaner now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]