jscheffl commented on code in PR #44427:
URL: https://github.com/apache/airflow/pull/44427#discussion_r1861212992


##########
airflow/executors/local_executor.py:
##########
@@ -83,88 +75,47 @@ def _run_worker(
             )
             break
 
-        if item is None:
+        if workload is None:
             # Received poison pill, no more tasks to run
             return
 
         # Decrement this as soon as we pick up a message off the queue
         with unread_messages:
             unread_messages.value -= 1
+        key = None
+        if ti := getattr(workload, "ti", None):
+            key = ti.key
+        else:
+            raise TypeError(f"Don't know how to get ti key from 
{type(workload).__name__}")
 
-        (key, command) = item
         try:
-            state = _execute_work(log, key, command)
+            _execute_work(log, workload)
 
-            output.put((key, state, None))
+            output.put((key, TaskInstanceState.SUCCESS, None))
         except Exception as e:
+            log.exception("uhoh")
             output.put((key, TaskInstanceState.FAILED, e))
 
 
-def _execute_work(log: logging.Logger, key: TaskInstanceKey, command: 
CommandType) -> TaskInstanceState:
+def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> 
None:
     """
     Execute command received and stores result state in queue.
 
     :param key: the key to identify the task instance
     :param command: the command to execute
     """
-    setproctitle(f"airflow worker -- LocalExecutor: {command}")
-    dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command)
-    try:
-        with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
-            if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
-                return _execute_work_in_subprocess(log, command)
-            else:
-                return _execute_work_in_fork(log, command)
-    finally:
-        # Remove the command since the worker is done executing the task
-        setproctitle("airflow worker -- LocalExecutor: <idle>")
-
-
-def _execute_work_in_subprocess(log: logging.Logger, command: CommandType) -> 
TaskInstanceState:
-    try:
-        subprocess.check_call(command, close_fds=True)
-        return TaskInstanceState.SUCCESS
-    except subprocess.CalledProcessError as e:
-        log.error("Failed to execute task %s.", e)
-        return TaskInstanceState.FAILED
-
-
-def _execute_work_in_fork(log: logging.Logger, command: CommandType) -> 
TaskInstanceState:
-    pid = os.fork()
-    if pid:
-        # In parent, wait for the child
-        pid, ret = os.waitpid(pid, 0)
-        return TaskInstanceState.SUCCESS if ret == 0 else 
TaskInstanceState.FAILED
-
-    from airflow.sentry import Sentry
-
-    ret = 1
-    try:
-        import signal
-
-        from airflow.cli.cli_parser import get_parser
-
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        signal.signal(signal.SIGUSR2, signal.SIG_DFL)
+    from airflow.sdk.execution_time.supervisor import supervise
 
-        parser = get_parser()
-        # [1:] - remove "airflow" from the start of the command
-        args = parser.parse_args(command[1:])
-        args.shut_down_logging = False
-
-        setproctitle(f"airflow task supervisor: {command}")
-
-        args.func(args)
-        ret = 0
-        return TaskInstanceState.SUCCESS
-    except Exception as e:
-        log.exception("Failed to execute task %s.", e)
-        return TaskInstanceState.FAILED
-    finally:
-        Sentry.flush()
-        logging.shutdown()
-        os._exit(ret)
+    setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}")
+    # This will return the exit code of the task process, but we don't care 
about that, just if the
+    # _supervisor_ had an error reporting the state back (which will result in 
an exception.)
+    supervise(
+        ti=workload.ti,
+        dag_path=workload.dag_path,
+        token=workload.token,
+        server="http://localhost:9091/execution/";,

Review Comment:
   Is `conf.getint()` not suiteable for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to