hterik commented on code in PR #26639:
URL: https://github.com/apache/airflow/pull/26639#discussion_r1071796755
##########
airflow/executors/kubernetes_executor.py:
##########
@@ -599,8 +655,31 @@ def sync(self) -> None:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.sync()
- last_resource_version = None
- while True:
+ """processing result queue"""
+ multi_threads_queue_process(
+ queue_size=self.result_queue.qsize(),
+ queue_type='result',
+ process_method=self.process_result_queue,
Review Comment:
Today they might only use the `[]` and `in` operators, which in isolation
are thread-safe, but tomorrow someone might add a for-loop or other
side-effects, giving you "changed size during iteration" error. It is a lot of
complexity to keep in mind if this can be modified from several threads at the
same time.
I'm not 100% sure but i believe this piece would potentially break,
https://github.com/apache/airflow/blob/65010fda091242870a410c65478eae362899763b/airflow/executors/base_executor.py#L334-L336,
if some thread is modifying event_buffer while one thread is iterating the
loop, because it takes a snapshot of the keys before starting to pop them off.
My suggestion is to isolate the multithreaded operations to more pure
functions, then return their results on a queue which is consumed in the same
main thread as is handling it today. Otherwise this could benefit from more
documentation, on which member-properties that must be protected by locks, or
what type of operations that are allowed in each function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]