ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2748511378
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -945,21 +1042,44 @@ def process_executor_events(
ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int]
= {}
event_buffer = executor.get_event_buffer()
tis_with_right_state: list[TaskInstanceKey] = []
-
- # Report execution
- for ti_key, (state, _) in event_buffer.items():
- # We create map (dag_id, task_id, logical_date) -> in-memory
try_number
- ti_primary_key_to_try_number_map[ti_key.primary] =
ti_key.try_number
-
- cls.logger().info("Received executor event with state %s for task
instance %s", state, ti_key)
- if state in (
- TaskInstanceState.FAILED,
- TaskInstanceState.SUCCESS,
- TaskInstanceState.QUEUED,
- TaskInstanceState.RUNNING,
- TaskInstanceState.RESTARTING,
- ):
- tis_with_right_state.append(ti_key)
+ callback_keys_with_events: list[str] = []
+
+ # Report execution - handle both task and callback events
+ for key, (state, _) in event_buffer.items():
+ if isinstance(key, TaskInstanceKey):
+ ti_primary_key_to_try_number_map[key.primary] = key.try_number
+ cls.logger().info("Received executor event with state %s for
task instance %s", state, key)
+ if state in (
+ TaskInstanceState.FAILED,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.RESTARTING,
+ ):
+ tis_with_right_state.append(key)
+ else:
+ # Callback event (key is string UUID)
+ cls.logger().info("Received executor event with state %s for
callback %s", state, key)
+ if state in (TaskInstanceState.FAILED,
TaskInstanceState.SUCCESS):
+ callback_keys_with_events.append(key)
+
+ # Handle callback completion events
+ for callback_id in callback_keys_with_events:
+ state, info = event_buffer.pop(callback_id)
+ callback = session.get(Callback, callback_id)
+ if callback:
+ # Note: We receive TaskInstanceState from executor
(SUCCESS/FAILED) but convert to CallbackState here.
+ # This is intentional - executor layer uses generic completion
states, scheduler converts to proper types.
+ if state == TaskInstanceState.SUCCESS:
Review Comment:
That's up now, let me know what you think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]