o-nikolas commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2921362632


##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -205,55 +213,97 @@ def sync(self):
             self.log.exception("An error occurred while syncing tasks")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
-        from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+
+        for w in workload_items:
+            key: TaskInstanceKey | str
+            command: list[workloads.All]
+            queue: str | None
+            if isinstance(w, workloads.ExecuteTask):
+                command = [w]
+                key = w.ti.key
+                queue = w.ti.queue
+                executor_config = w.ti.executor_config or {}
+
+                del self.queued_tasks[key]
 
-    def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
-        from airflow.executors.workloads import ExecuteTask
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                    executor_config=executor_config,
+                )
+
+                self.running.add(key)
+                continue
 
-        for w in workloads:
-            if not isinstance(w, ExecuteTask):
-                raise RuntimeError(f"{type(self)} cannot handle workloads of 
type {type(w)}")
+            if AIRFLOW_V_3_2_PLUS and isinstance(w, workloads.ExecuteCallback):
+                command = [w]
+                key = w.callback.id
+                queue = None
 
-            command = [w]
-            key = w.ti.key
-            queue = w.ti.queue
-            executor_config = w.ti.executor_config or {}
+                if isinstance(w.callback.data, dict) and "queue" in 
w.callback.data:
+                    queue = w.callback.data["queue"]
 
-            del self.queued_tasks[key]
-            self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)  # type: ignore[arg-type]
-            self.running.add(key)
+                del self.queued_callbacks[key]
 
-    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
+                self.execute_async(
+                    key=key,
+                    command=command,
+                    queue=queue,
+                )
+
+                self.running.add(key)
+                continue
+
+            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(w)}")
+
+    def execute_async(
+        self,
+        key: TaskInstanceKey | str,

Review Comment:
   `WorkloadKey`



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -473,13 +531,12 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
             ]:
                 for ti, ser_task_key in serialized_task_keys:
                     try:
-                        task_key = 
TaskInstanceKey.from_dict(json.loads(ser_task_key))
+                        data = json.loads(ser_task_key)
+                        task_key = TaskInstanceKey.from_dict(data)
                     except Exception:
-                        # If that task fails to deserialize, we should just 
skip it.
-                        self.log.exception(
-                            "Task failed to be adopted because the key could 
not be deserialized"
-                        )
-                        continue
+                        # Callback workloads use string keys.
+                        task_key = ser_task_key
+

Review Comment:
   @ferruzzi can callbacks be adopted? If not, then we don't need these changes 
here, or if anything, only to discard callbacks (we probably don't want to log 
for each one).



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -263,7 +313,7 @@ def execute_async(self, key: TaskInstanceKey, command: 
CommandType, queue=None,
 
     def attempt_task_runs(self):

Review Comment:
   Rename this method?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -339,20 +395,20 @@ def attempt_task_runs(self):
                     self.fail(task_key)
             else:
                 status_code = response.get("StatusCode")
-                self.log.info("Invoked Lambda for task %s with status %s", 
task_key, status_code)
+                self.log.info("Invoked Lambda for workload %s with status %s", 
task_key, status_code)
                 self.running_tasks[ser_task_key] = task_key
                 # Add the serialized task key as the info, this will be 
assigned on the ti as the external_executor_id
                 self.running_state(task_key, ser_task_key)
 
     def sync_running_tasks(self):

Review Comment:
   Should this method be renamed?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -411,7 +467,7 @@ def process_queue(self, queue_url: str):
                 task_key = self.running_tasks[ser_task_key]
             except KeyError:

Review Comment:
   Shouldn't all these mentions of `task(s)` in variables and comments be 
updated?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -426,12 +482,12 @@ def process_queue(self, queue_url: str):
 
             if task_key:
                 if return_code == 0:
-                    self.success(task_key)
+                    self.success(task_key)  # type: ignore[arg-type]

Review Comment:
   Why are we just ignoring this instead of getting the typing right?
   
   Here and just below as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to