hterik commented on code in PR #26639:
URL: https://github.com/apache/airflow/pull/26639#discussion_r1071796755


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -599,8 +655,31 @@ def sync(self) -> None:
             raise AirflowException(NOT_STARTED_MESSAGE)
         self.kube_scheduler.sync()
 
-        last_resource_version = None
-        while True:
+        """processing result queue"""
+        multi_threads_queue_process(
+            queue_size=self.result_queue.qsize(),
+            queue_type='result',
+            process_method=self.process_result_queue,

Review Comment:
   Today they might only use the `[]` and `in` operators, which in isolation 
are thread-safe, but tomorrow someone might add a for-loop or other 
side-effects, giving you "changed size during iteration" error. It is a lot of 
complexity to keep in mind if this can be modified from several threads at the 
same time.
   
   I'm not 100% sure but i believe this piece would potentially break,
   
https://github.com/apache/airflow/blob/65010fda091242870a410c65478eae362899763b/airflow/executors/base_executor.py#L334-L336,
 if some thread is modifying event_buffer while one thread is iterating the 
loop, because it takes a snapshot of the keys before starting to pop them off.
   
   My suggestion is to isolate the multithreaded operations to more pure 
functions, then return their results on a queue which is consumed in the same 
main thread as is handling it today. Otherwise this could benefit from more 
documentation, on which member-properties that must be protected by locks, or 
what type of operations that are allowed in each function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to