kaxil commented on code in PR #44427:
URL: https://github.com/apache/airflow/pull/44427#discussion_r1861058287
##########
airflow/executors/local_executor.py:
##########
@@ -83,88 +75,47 @@ def _run_worker(
)
break
- if item is None:
+ if workload is None:
# Received poison pill, no more tasks to run
return
# Decrement this as soon as we pick up a message off the queue
with unread_messages:
unread_messages.value -= 1
+ key = None
+ if ti := getattr(workload, "ti", None):
+ key = ti.key
+ else:
+ raise TypeError(f"Don't know how to get ti key from
{type(workload).__name__}")
- (key, command) = item
try:
- state = _execute_work(log, key, command)
+ _execute_work(log, workload)
- output.put((key, state, None))
+ output.put((key, TaskInstanceState.SUCCESS, None))
except Exception as e:
+ log.exception("uhoh")
output.put((key, TaskInstanceState.FAILED, e))
-def _execute_work(log: logging.Logger, key: TaskInstanceKey, command:
CommandType) -> TaskInstanceState:
+def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) ->
None:
"""
Execute command received and stores result state in queue.
:param key: the key to identify the task instance
:param command: the command to execute
"""
- setproctitle(f"airflow worker -- LocalExecutor: {command}")
- dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command)
- try:
- with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
- if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
- return _execute_work_in_subprocess(log, command)
- else:
- return _execute_work_in_fork(log, command)
- finally:
- # Remove the command since the worker is done executing the task
- setproctitle("airflow worker -- LocalExecutor: <idle>")
-
-
-def _execute_work_in_subprocess(log: logging.Logger, command: CommandType) ->
TaskInstanceState:
- try:
- subprocess.check_call(command, close_fds=True)
- return TaskInstanceState.SUCCESS
- except subprocess.CalledProcessError as e:
- log.error("Failed to execute task %s.", e)
- return TaskInstanceState.FAILED
-
-
-def _execute_work_in_fork(log: logging.Logger, command: CommandType) ->
TaskInstanceState:
- pid = os.fork()
- if pid:
- # In parent, wait for the child
- pid, ret = os.waitpid(pid, 0)
- return TaskInstanceState.SUCCESS if ret == 0 else
TaskInstanceState.FAILED
-
- from airflow.sentry import Sentry
-
- ret = 1
- try:
- import signal
-
- from airflow.cli.cli_parser import get_parser
-
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGUSR2, signal.SIG_DFL)
+ from airflow.sdk.execution_time.supervisor import supervise
- parser = get_parser()
- # [1:] - remove "airflow" from the start of the command
- args = parser.parse_args(command[1:])
- args.shut_down_logging = False
-
- setproctitle(f"airflow task supervisor: {command}")
-
- args.func(args)
- ret = 0
- return TaskInstanceState.SUCCESS
- except Exception as e:
- log.exception("Failed to execute task %s.", e)
- return TaskInstanceState.FAILED
- finally:
- Sentry.flush()
- logging.shutdown()
- os._exit(ret)
+ setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}")
+ # This will return the exit code of the task process, but we don't care
about that, just if the
+ # _supervisor_ had an error reporting the state back (which will result in
an exception.)
+ supervise(
+ ti=workload.ti,
+ dag_path=workload.dag_path,
+ token=workload.token,
+ server="http://localhost:9091/execution/",
Review Comment:
Probably, but `port` could change if there are other things on the box?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]