ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r884612358
##########
airflow/executors/local_executor.py:
##########
@@ -201,161 +124,27 @@ class LocalExecutor(BaseExecutor):
:param parallelism: how many parallel processes are run in the executor
"""
+ futures_executor: Optional[concurrent.futures.Executor]
+ futures: Dict[concurrent.futures.Future, TaskInstanceKey]
+
def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
if self.parallelism < 0:
raise AirflowException("parallelism must be bigger than or equal
to 0")
- self.manager: Optional[SyncManager] = None
- self.result_queue: Optional['Queue[TaskInstanceStateType]'] = None
- self.workers: List[QueuedLocalWorker] = []
- self.workers_used: int = 0
- self.workers_active: int = 0
- self.impl: Optional[
- Union['LocalExecutor.UnlimitedParallelism',
'LocalExecutor.LimitedParallelism']
- ] = None
-
- class UnlimitedParallelism:
- """
- Implements LocalExecutor with unlimited parallelism, starting one
process
- per each command to execute.
-
- :param executor: the executor instance to implement.
- """
-
- def __init__(self, executor: 'LocalExecutor'):
- self.executor: 'LocalExecutor' = executor
-
- def start(self) -> None:
- """Starts the executor."""
- self.executor.workers_used = 0
- self.executor.workers_active = 0
-
- def execute_async(
- self,
- key: TaskInstanceKey,
- command: CommandType,
- queue: Optional[str] = None,
- executor_config: Optional[Any] = None,
- ) -> None:
- """
- Executes task asynchronously.
-
- :param key: the key to identify the task instance
- :param command: the command to execute
- :param queue: Name of the queue
- :param executor_config: configuration for the executor
- """
- if not self.executor.result_queue:
- raise AirflowException(NOT_STARTED_MESSAGE)
- local_worker = LocalWorker(self.executor.result_queue, key=key,
command=command)
- self.executor.workers_used += 1
- self.executor.workers_active += 1
- local_worker.start()
-
- def sync(self) -> None:
- """Sync will get called periodically by the heartbeat method."""
- if not self.executor.result_queue:
- raise AirflowException("Executor should be started first")
- while not self.executor.result_queue.empty():
- results = self.executor.result_queue.get()
- self.executor.change_state(*results)
- self.executor.workers_active -= 1
-
- def end(self) -> None:
- """
- This method is called when the caller is done submitting job and
- wants to wait synchronously for the job submitted previously to be
- all done.
- """
- while self.executor.workers_active > 0:
- self.executor.sync()
-
- class LimitedParallelism:
- """
- Implements LocalExecutor with limited parallelism using a task queue to
- coordinate work distribution.
-
- :param executor: the executor instance to implement.
- """
-
- def __init__(self, executor: 'LocalExecutor'):
- self.executor: 'LocalExecutor' = executor
- self.queue: Optional['Queue[ExecutorWorkType]'] = None
-
- def start(self) -> None:
- """Starts limited parallelism implementation."""
- if not self.executor.manager:
- raise AirflowException(NOT_STARTED_MESSAGE)
- self.queue = self.executor.manager.Queue()
- if not self.executor.result_queue:
- raise AirflowException(NOT_STARTED_MESSAGE)
- self.executor.workers = [
- QueuedLocalWorker(self.queue, self.executor.result_queue)
- for _ in range(self.executor.parallelism)
- ]
-
- self.executor.workers_used = len(self.executor.workers)
-
- for worker in self.executor.workers:
- worker.start()
-
- def execute_async(
- self,
- key: TaskInstanceKey,
- command: CommandType,
- queue: Optional[str] = None,
- executor_config: Optional[Any] = None,
- ) -> None:
- """
- Executes task asynchronously.
-
- :param key: the key to identify the task instance
- :param command: the command to execute
- :param queue: name of the queue
- :param executor_config: configuration for the executor
- """
- if not self.queue:
- raise AirflowException(NOT_STARTED_MESSAGE)
- self.queue.put((key, command))
-
- def sync(self):
- """Sync will get called periodically by the heartbeat method."""
- while True:
- try:
- results = self.executor.result_queue.get_nowait()
- try:
- self.executor.change_state(*results)
- finally:
- self.executor.result_queue.task_done()
- except Empty:
- break
-
- def end(self):
- """Ends the executor. Sends the poison pill to all workers."""
- for _ in self.executor.workers:
- self.queue.put((None, None))
-
- # Wait for commands to finish
- self.queue.join()
- self.executor.sync()
+ self.futures = {}
def start(self) -> None:
"""Starts the executor"""
old_proctitle = getproctitle()
setproctitle("airflow executor -- LocalExecutor")
- self.manager = Manager()
setproctitle(old_proctitle)
- self.result_queue = self.manager.Queue()
- self.workers = []
- self.workers_used = 0
- self.workers_active = 0
- self.impl = (
- LocalExecutor.UnlimitedParallelism(self)
- if self.parallelism == 0
- else LocalExecutor.LimitedParallelism(self)
- )
+ # This isn't _truely_ unlimited, but it should be good enough!
+ size = 2**30 if self.parallelism == 0 else self.parallelism
Review Comment:
Huh. TIL
Sadly I think that although this isn't documented it's still "relied upon"
behaviour so we can't just remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]