ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791260039
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -495,24 +534,26 @@ def running_state(self, key: TaskInstanceKey, info=None)
-> None:
"""
self.change_state(key, TaskInstanceState.RUNNING, info,
remove_running=False)
- def get_event_buffer(self, dag_ids=None) -> dict[TaskInstanceKey,
EventBufferValueType]:
+ def get_event_buffer(self, dag_ids=None) -> dict[WorkloadKey,
EventBufferValueType]:
"""
Return and flush the event buffer.
In case dag_ids is specified it will only return and flush events
for the given dag_ids. Otherwise, it returns and flushes all events.
+ Note: Callback events (with string keys) are always included
regardless of dag_ids filter.
:param dag_ids: the dag_ids to return events for; returns all if given
``None``.
:return: a dict of events
"""
- cleared_events: dict[TaskInstanceKey, EventBufferValueType] = {}
+ cleared_events: dict[WorkloadKey, EventBufferValueType] = {}
if dag_ids is None:
cleared_events = self.event_buffer
self.event_buffer = {}
else:
- for ti_key in list(self.event_buffer.keys()):
- if ti_key.dag_id in dag_ids:
- cleared_events[ti_key] = self.event_buffer.pop(ti_key)
+ for key in list(self.event_buffer.keys()):
+ # Include if it's a callback (string key) or if it's a task in
the specified dags
+ if isinstance(key, str) or key.dag_id in dag_ids:
Review Comment:
I did partially think of that and made the CallbackKey type alias.
It's just a type alias for a `str` (for now?), and it was a later addition;
I should have gone back and used it here when I created it. I'll make that
change.
But you are also right that in the future, other workload types may use
strings and we may want to adjust this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]