dstandish commented on code in PR #26639:
URL: https://github.com/apache/airflow/pull/26639#discussion_r1056730524
##########
airflow/executors/kubernetes_executor.py:
##########
@@ -62,6 +64,50 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+def multi_threads_queue_process(
+ queue_size: int,
+ queue_type: str,
+ process_method: Callable,
+ max_threads: int,
+ log: Logger,
+ batch_size: Optional[int] = None,
+) -> None:
+ """
+ Helper method to enable multi-threads for processing queues used with
kubernetes executor
+ :param queue_size: the size of the queue getting processed
+ :param queue_type: the type of the queue
+ :param process_method: the real method processing the queue
+ :param max_threads: the max num of threads to be used
+ :param log: log
+ :param batch_size: the max num of items we want to process in this round.
+ If it's not set, the current queue size will be used.
+ """
+ if queue_size == 0:
+ log.info(f'There is no item to process in the {queue_type} queue.')
+ return
+
+ start_time = time.time()
+ log.info(f'Start processing {queue_type} queue with at most {max_threads}
threads.')
+
+ batch_size = min(batch_size or queue_size, queue_size)
+ max_threads = min(max_threads, queue_size)
+
+ threads = []
+ quotient, remainder = divmod(batch_size, max_threads)
+ for i in range(max_threads):
+ sub_batch_size = quotient + 1 if i < remainder else quotient
+ t = Thread(target=process_method, args=[sub_batch_size])
+ threads.append(t)
+ t.start()
+ for t in threads:
+ t.join()
Review Comment:
but you see the issue i'm thinking of year? since you are defining fixed
sizes for each thread, one thread may finish more quickly than the others, so
it could do more work if the others are taking longer per-item.
so it would be more efficient if each thread just drew from the same queue.
BUT the complication here is that you need to limit the total number of
tasks processed because of that setting. i think one way to handle that is you
could create a new queue object for each batch with all items taken from
task_queue that you need to process in in the batch. but maybe this is not
really gonna matter too much. and we needn't let perfect be enemy of good.
other thing i'll mention is.... k8s has support for async requests... not
asyncio, but threads... it manages its own thread pool... you use by adding
`async_req=True` to request. thread count is configured in client. but
refactoring to this woud be much more involved so not suggesting it, but just
want to mention.
anyway, i'll keep watching this one closely. i have been a bit busy over
last month with holidays and illness and my personal projects but i'm excited
that y'all are looking to get more involved and am committed to helping you see
this one through and encouraging the any others coming in the future. thanks.
##########
airflow/executors/kubernetes_executor.py:
##########
@@ -62,6 +64,50 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+def multi_threads_queue_process(
+ queue_size: int,
+ queue_type: str,
+ process_method: Callable,
+ max_threads: int,
+ log: Logger,
+ batch_size: Optional[int] = None,
+) -> None:
+ """
+ Helper method to enable multi-threads for processing queues used with
kubernetes executor
+ :param queue_size: the size of the queue getting processed
+ :param queue_type: the type of the queue
+ :param process_method: the real method processing the queue
+ :param max_threads: the max num of threads to be used
+ :param log: log
+ :param batch_size: the max num of items we want to process in this round.
+ If it's not set, the current queue size will be used.
+ """
+ if queue_size == 0:
+ log.info(f'There is no item to process in the {queue_type} queue.')
+ return
+
+ start_time = time.time()
+ log.info(f'Start processing {queue_type} queue with at most {max_threads}
threads.')
+
+ batch_size = min(batch_size or queue_size, queue_size)
+ max_threads = min(max_threads, queue_size)
+
+ threads = []
+ quotient, remainder = divmod(batch_size, max_threads)
+ for i in range(max_threads):
+ sub_batch_size = quotient + 1 if i < remainder else quotient
+ t = Thread(target=process_method, args=[sub_batch_size])
+ threads.append(t)
+ t.start()
+ for t in threads:
+ t.join()
Review Comment:
but you see the issue i'm thinking of here? since you are defining fixed
sizes for each thread, one thread may finish more quickly than the others, so
it could do more work if the others are taking longer per-item.
so it would be more efficient if each thread just drew from the same queue.
BUT the complication here is that you need to limit the total number of
tasks processed because of that setting. i think one way to handle that is you
could create a new queue object for each batch with all items taken from
task_queue that you need to process in in the batch. but maybe this is not
really gonna matter too much. and we needn't let perfect be enemy of good.
other thing i'll mention is.... k8s has support for async requests... not
asyncio, but threads... it manages its own thread pool... you use by adding
`async_req=True` to request. thread count is configured in client. but
refactoring to this woud be much more involved so not suggesting it, but just
want to mention.
anyway, i'll keep watching this one closely. i have been a bit busy over
last month with holidays and illness and my personal projects but i'm excited
that y'all are looking to get more involved and am committed to helping you see
this one through and encouraging the any others coming in the future. thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]