ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791436144


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1041,21 +1111,44 @@ def process_executor_events(
         ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] 
= {}
         event_buffer = executor.get_event_buffer()
         tis_with_right_state: list[TaskInstanceKey] = []
-
-        # Report execution
-        for ti_key, (state, _) in event_buffer.items():
-            # We create map (dag_id, task_id, logical_date) -> in-memory 
try_number
-            ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
-
-            cls.logger().info("Received executor event with state %s for task 
instance %s", state, ti_key)
-            if state in (
-                TaskInstanceState.FAILED,
-                TaskInstanceState.SUCCESS,
-                TaskInstanceState.QUEUED,
-                TaskInstanceState.RUNNING,
-                TaskInstanceState.RESTARTING,
-            ):
-                tis_with_right_state.append(ti_key)
+        callback_keys_with_events: list[str] = []
+
+        # Report execution - handle both task and callback events
+        for key, (state, _) in event_buffer.items():
+            if isinstance(key, TaskInstanceKey):
+                ti_primary_key_to_try_number_map[key.primary] = key.try_number
+                cls.logger().info("Received executor event with state %s for 
task instance %s", state, key)
+                if state in (
+                    TaskInstanceState.FAILED,
+                    TaskInstanceState.SUCCESS,
+                    TaskInstanceState.QUEUED,
+                    TaskInstanceState.RUNNING,
+                    TaskInstanceState.RESTARTING,
+                ):
+                    tis_with_right_state.append(key)
+            else:
+                # Callback event (key is string UUID)
+                cls.logger().info("Received executor event with state %s for 
callback %s", state, key)
+                if state in (TaskInstanceState.FAILED, 
TaskInstanceState.SUCCESS):
+                    callback_keys_with_events.append(key)
+
+        # Handle callback completion events
+        for callback_id in callback_keys_with_events:
+            state, info = event_buffer.pop(callback_id)
+            callback = session.get(Callback, callback_id)
+            if callback:

Review Comment:
   There's a possible edge case like if the dagrun is deleted, the callback is 
on a cascading delete in the database.... but I don't know how likely that is.  
I'll refactor it to be a bit more clear and add a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to