dstandish commented on code in PR #26639:
URL: https://github.com/apache/airflow/pull/26639#discussion_r1056073449
##########
airflow/executors/kubernetes_executor.py:
##########
@@ -62,6 +64,50 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+def multi_threads_queue_process(
+ queue_size: int,
+ queue_type: str,
+ process_method: Callable,
+ max_threads: int,
+ log: Logger,
+ batch_size: Optional[int] = None,
+) -> None:
+ """
+ Helper method to enable multi-threads for processing queues used with
kubernetes executor
+ :param queue_size: the size of the queue getting processed
+ :param queue_type: the type of the queue
+ :param process_method: the real method processing the queue
+ :param max_threads: the max num of threads to be used
+ :param log: log
+ :param batch_size: the max num of items we want to process in this round.
+ If it's not set, the current queue size will be used.
+ """
+ if queue_size == 0:
+ log.info(f'There is no item to process in the {queue_type} queue.')
+ return
+
+ start_time = time.time()
+ log.info(f'Start processing {queue_type} queue with at most {max_threads}
threads.')
+
+ batch_size = min(batch_size or queue_size, queue_size)
+ max_threads = min(max_threads, queue_size)
+
+ threads = []
+ quotient, remainder = divmod(batch_size, max_threads)
+ for i in range(max_threads):
+ sub_batch_size = quotient + 1 if i < remainder else quotient
+ t = Thread(target=process_method, args=[sub_batch_size])
+ threads.append(t)
+ t.start()
+ for t in threads:
+ t.join()
Review Comment:
if you want to do this manually, at least i would separate the batch
construction logic from the thread creation logic e.g. you could do this
```
def get_batches(
queue_size=100,
batch_size=10,
max_threads=3,
):
batch_size = min(batch_size or queue_size, queue_size)
max_threads = min(max_threads, queue_size)
quotient, remainder = divmod(batch_size, max_threads)
for i in range(max_threads):
sub_batch_size = quotient + 1 if i < remainder else quotient
yield sub_batch_size
threads = []
for num in get_batches():
t = Thread(target=process_method, args=[num])
threads.append(t)
t.start()
for t in threads:
t.join()
```
or something
it's not obvious what this batch definition logic is supposed to do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]