kaxil commented on code in PR #44427: URL: https://github.com/apache/airflow/pull/44427#discussion_r1861096740
########## airflow/executors/local_executor.py: ########## @@ -83,88 +75,47 @@ def _run_worker( ) break - if item is None: + if workload is None: # Received poison pill, no more tasks to run return # Decrement this as soon as we pick up a message off the queue with unread_messages: unread_messages.value -= 1 + key = None + if ti := getattr(workload, "ti", None): + key = ti.key + else: + raise TypeError(f"Don't know how to get ti key from {type(workload).__name__}") - (key, command) = item try: - state = _execute_work(log, key, command) + _execute_work(log, workload) - output.put((key, state, None)) + output.put((key, TaskInstanceState.SUCCESS, None)) except Exception as e: + log.exception("uhoh") output.put((key, TaskInstanceState.FAILED, e)) -def _execute_work(log: logging.Logger, key: TaskInstanceKey, command: CommandType) -> TaskInstanceState: +def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> None: """ Execute command received and stores result state in queue. :param key: the key to identify the task instance :param command: the command to execute """ - setproctitle(f"airflow worker -- LocalExecutor: {command}") - dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command) - try: - with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id): - if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER: - return _execute_work_in_subprocess(log, command) - else: - return _execute_work_in_fork(log, command) - finally: - # Remove the command since the worker is done executing the task - setproctitle("airflow worker -- LocalExecutor: <idle>") - - -def _execute_work_in_subprocess(log: logging.Logger, command: CommandType) -> TaskInstanceState: - try: - subprocess.check_call(command, close_fds=True) - return TaskInstanceState.SUCCESS - except subprocess.CalledProcessError as e: - log.error("Failed to execute task %s.", e) - return TaskInstanceState.FAILED - - -def _execute_work_in_fork(log: logging.Logger, command: CommandType) -> TaskInstanceState: - pid = os.fork() - if pid: - # In parent, wait for the child - pid, ret = os.waitpid(pid, 0) - return TaskInstanceState.SUCCESS if ret == 0 else TaskInstanceState.FAILED - - from airflow.sentry import Sentry - - ret = 1 - try: - import signal - - from airflow.cli.cli_parser import get_parser - - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGUSR2, signal.SIG_DFL) + from airflow.sdk.execution_time.supervisor import supervise - parser = get_parser() - # [1:] - remove "airflow" from the start of the command - args = parser.parse_args(command[1:]) - args.shut_down_logging = False - - setproctitle(f"airflow task supervisor: {command}") - - args.func(args) - ret = 0 - return TaskInstanceState.SUCCESS - except Exception as e: - log.exception("Failed to execute task %s.", e) - return TaskInstanceState.FAILED - finally: - Sentry.flush() - logging.shutdown() - os._exit(ret) + setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}") + # This will return the exit code of the task process, but we don't care about that, just if the + # _supervisor_ had an error reporting the state back (which will result in an exception.) + supervise( + ti=workload.ti, + dag_path=workload.dag_path, + token=workload.token, + server="http://localhost:9091/execution/", Review Comment: airflow.cfg is fine for now? Worst case, an env var for now -- to be changed later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org