ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850509324
##########
airflow/executors/local_executor.py:
##########
@@ -25,192 +25,139 @@
from __future__ import annotations
-import contextlib
import logging
+import multiprocessing
import os
import subprocess
-from abc import abstractmethod
-from multiprocessing import Manager, Process
-from queue import Empty
+from multiprocessing import Queue, SimpleQueue
from typing import TYPE_CHECKING, Any, Optional, Tuple
-from setproctitle import getproctitle, setproctitle
-
from airflow import settings
-from airflow.exceptions import AirflowException
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
-from airflow.traces.tracer import Trace, add_span
+from airflow.traces.tracer import add_span
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- from multiprocessing.managers import SyncManager
- from queue import Queue
-
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceStateType
from airflow.models.taskinstancekey import TaskInstanceKey
# This is a work to be executed by a worker.
# It can Key and Command - but it can also be None, None which is actually
a
# "Poison Pill" - worker seeing Poison Pill should take the pill and ...
die instantly.
- ExecutorWorkType = Tuple[Optional[TaskInstanceKey], Optional[CommandType]]
+ ExecutorWorkType = Optional[Tuple[TaskInstanceKey, CommandType]]
+ TaskInstanceStateType = Tuple[TaskInstanceKey, TaskInstanceState,
Optional[Exception]]
-class LocalWorkerBase(Process, LoggingMixin):
- """
- LocalWorkerBase implementation to run airflow commands.
+def _run_worker(logger_name: str, input: SimpleQueue[ExecutorWorkType],
output: Queue[TaskInstanceStateType]):
+ import signal
- Executes the given command and puts the result into a result queue when
done, terminating execution.
+ from setproctitle import setproctitle
Review Comment:
I'm not sure this actually makes any difference now I think about it, as
this is in the mp process, but it's about to fork a new process anyway. I'll
move it back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]