dstandish commented on code in PR #26639:
URL: https://github.com/apache/airflow/pull/26639#discussion_r1056730524


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -62,6 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
+def multi_threads_queue_process(
+    queue_size: int,
+    queue_type: str,
+    process_method: Callable,
+    max_threads: int,
+    log: Logger,
+    batch_size: Optional[int] = None,
+) -> None:
+    """
+    Helper method to enable multi-threads for processing queues used with 
kubernetes executor
+    :param queue_size: the size of the queue getting processed
+    :param queue_type: the type of the queue
+    :param process_method: the real method processing the queue
+    :param max_threads: the max num of threads to be used
+    :param log: log
+    :param batch_size: the max num of items we want to process in this round.
+                       If it's not set, the current queue size will be used.
+    """
+    if queue_size == 0:
+        log.info(f'There is no item to process in the {queue_type} queue.')
+        return
+
+    start_time = time.time()
+    log.info(f'Start processing {queue_type} queue with at most {max_threads} 
threads.')
+
+    batch_size = min(batch_size or queue_size, queue_size)
+    max_threads = min(max_threads, queue_size)
+
+    threads = []
+    quotient, remainder = divmod(batch_size, max_threads)
+    for i in range(max_threads):
+        sub_batch_size = quotient + 1 if i < remainder else quotient
+        t = Thread(target=process_method, args=[sub_batch_size])
+        threads.append(t)
+        t.start()
+    for t in threads:
+        t.join()

Review Comment:
   but you see the issue i'm thinking of year? since you are defining fixed 
sizes for each thread, one thread may finish more quickly than the others, so 
it could do more work if the others are taking longer per-item.
   
   so it would be more efficient if each thread just drew from the same queue.
   
   BUT the complication here is that you need to limit the total number of 
tasks processed because of that setting.  i think one way to handle that is you 
could create a new queue object for each batch with all items taken from 
task_queue that you need to process in in the batch.  but maybe this is not 
really gonna matter too much.  and we needn't let perfect be enemy of good.
   
   other thing i'll mention is....  k8s has support for async requests... not 
asyncio, but threads... it manages its own thread pool... you use by adding 
`async_req=True` to request.  thread count is configured in client.  but 
refactoring to this woud be much more involved so not suggesting it, but just 
want to mention.
   
   anyway, i'll keep watching this one closely.  i have been a bit busy over 
last month with holidays and illness and my personal projects but i'm excited 
that y'all are looking to get more involved and am committed to helping you see 
this one through and encouraging the any others coming in the future. thanks.
   
   
   



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -62,6 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
+def multi_threads_queue_process(
+    queue_size: int,
+    queue_type: str,
+    process_method: Callable,
+    max_threads: int,
+    log: Logger,
+    batch_size: Optional[int] = None,
+) -> None:
+    """
+    Helper method to enable multi-threads for processing queues used with 
kubernetes executor
+    :param queue_size: the size of the queue getting processed
+    :param queue_type: the type of the queue
+    :param process_method: the real method processing the queue
+    :param max_threads: the max num of threads to be used
+    :param log: log
+    :param batch_size: the max num of items we want to process in this round.
+                       If it's not set, the current queue size will be used.
+    """
+    if queue_size == 0:
+        log.info(f'There is no item to process in the {queue_type} queue.')
+        return
+
+    start_time = time.time()
+    log.info(f'Start processing {queue_type} queue with at most {max_threads} 
threads.')
+
+    batch_size = min(batch_size or queue_size, queue_size)
+    max_threads = min(max_threads, queue_size)
+
+    threads = []
+    quotient, remainder = divmod(batch_size, max_threads)
+    for i in range(max_threads):
+        sub_batch_size = quotient + 1 if i < remainder else quotient
+        t = Thread(target=process_method, args=[sub_batch_size])
+        threads.append(t)
+        t.start()
+    for t in threads:
+        t.join()

Review Comment:
   but you see the issue i'm thinking of here? since you are defining fixed 
sizes for each thread, one thread may finish more quickly than the others, so 
it could do more work if the others are taking longer per-item.
   
   so it would be more efficient if each thread just drew from the same queue.
   
   BUT the complication here is that you need to limit the total number of 
tasks processed because of that setting.  i think one way to handle that is you 
could create a new queue object for each batch with all items taken from 
task_queue that you need to process in in the batch.  but maybe this is not 
really gonna matter too much.  and we needn't let perfect be enemy of good.
   
   other thing i'll mention is....  k8s has support for async requests... not 
asyncio, but threads... it manages its own thread pool... you use by adding 
`async_req=True` to request.  thread count is configured in client.  but 
refactoring to this woud be much more involved so not suggesting it, but just 
want to mention.
   
   anyway, i'll keep watching this one closely.  i have been a bit busy over 
last month with holidays and illness and my personal projects but i'm excited 
that y'all are looking to get more involved and am committed to helping you see 
this one through and encouraging the any others coming in the future. thanks.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to