ashb commented on code in PR #43893:
URL: https://github.com/apache/airflow/pull/43893#discussion_r1840301201


##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -0,0 +1,552 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervise and run Tasks in a subprocess."""
+
+from __future__ import annotations
+
+import atexit
+import io
+import logging
+import os
+import selectors
+import signal
+import sys
+import time
+import weakref
+from collections.abc import Generator
+from contextlib import suppress
+from datetime import datetime, timezone
+from socket import socket, socketpair
+from typing import TYPE_CHECKING, BinaryIO, Callable, ClassVar, Literal, 
NoReturn, cast, overload
+from uuid import UUID
+
+import attrs
+import httpx
+import msgspec
+import psutil
+import structlog
+
+from airflow.sdk.api.client import Client
+from airflow.sdk.api.datamodels._generated import TaskInstanceState
+from airflow.sdk.execution_time.comms import ConnectionResponse, 
GetConnection, StartupDetails, ToSupervisor
+
+if TYPE_CHECKING:
+    from structlog.typing import FilteringBoundLogger
+
+    from airflow.sdk.api.datamodels.activities import ExecuteTaskActivity
+    from airflow.sdk.api.datamodels.ti import TaskInstance
+
+
+__all__ = ["WatchedSubprocess", "supervise"]
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor")
+
+# TODO: Pull this from config
+SLOWEST_HEARTBEAT_INTERVAL: int = 30
+# Don't heartbeat more often than this
+FASTEST_HEARTBEAT_INTERVAL: int = 5
+
+
+@overload
+def mkpipe() -> tuple[socket, socket]: ...
+
+
+@overload
+def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+
+
+def mkpipe(
+    remote_read: bool = False,
+) -> tuple[socket, socket | BinaryIO]:
+    """
+    Create a pair of connected sockets.
+
+    The inheritable flag will be set correctly so that the end destined for 
the subprocess is kept open but
+    the end for this process is closed automatically by the OS.
+    """
+    rsock, wsock = socketpair()
+    local, remote = (wsock, rsock) if remote_read else (rsock, wsock)
+
+    remote.set_inheritable(True)
+    local.setblocking(False)
+
+    io: BinaryIO | socket
+    if remote_read:
+        # If _we_ are writing, we don't want to buffer
+        io = cast(BinaryIO, local.makefile("wb", buffering=0))
+    else:
+        io = local
+
+    return remote, io
+
+
+def _subprocess_main():
+    from airflow.sdk.execution_time.task_runner import main
+
+    main()
+
+
+def _fork_main(
+    child_stdin: socket,
+    child_stdout: socket,
+    child_stderr: socket,
+    log_fd: int,
+    target: Callable[[], None],
+) -> NoReturn:
+    # TODO: Make this process a session leader
+
+    # Uninstall the rich etc. exception handler
+    sys.excepthook = sys.__excepthook__
+    signal.signal(signal.SIGINT, signal.SIG_DFL)
+    signal.signal(signal.SIGUSR2, signal.SIG_DFL)
+
+    if log_fd > 0:
+        # A channel that the task can send JSON-formated logs over.
+        #
+        # JSON logs sent this way will be handled nicely
+        from airflow.sdk.log import configure_logging
+
+        log_io = os.fdopen(log_fd, "wb", buffering=0)
+        configure_logging(enable_pretty_log=False, output=log_io)
+
+    last_chance_stderr = sys.__stderr__ or sys.stderr
+
+    if "PYTEST_CURRENT_TEST" in os.environ:
+        # When we are running in pytest, it's output capturing messes us up. 
This works around it
+        sys.stdout = sys.__stdout__
+        sys.stderr = sys.__stderr__
+
+    # Ensure that sys.stdout et al (and the underlying filehandles for C 
libraries etc) are connected to the
+    # pipes form the supervisor
+
+    for handle_name, sock, mode, close in (
+        ("stdin", child_stdin, "r", True),
+        ("stdout", child_stdout, "w", True),
+        ("stderr", child_stderr, "w", False),
+    ):
+        handle = getattr(sys, handle_name)
+        try:
+            fd = handle.fileno()
+            os.dup2(sock.fileno(), fd)
+            if close:
+                handle.close()
+        except io.UnsupportedOperation:
+            if "PYTEST_CURRENT_TEST" in os.environ:
+                # When we're running under pytest, the stdin is not a real 
filehandle with an fd, so we need
+                # to handle that differently
+                fd = sock.fileno()
+            else:
+                raise
+
+        setattr(sys, handle_name, os.fdopen(fd, mode))
+
+    def exit(n: int) -> NoReturn:
+        with suppress(ValueError, OSError):
+            sys.stdout.flush()
+        with suppress(ValueError, OSError):
+            sys.stderr.flush()
+        with suppress(ValueError, OSError):
+            last_chance_stderr.flush()
+        os._exit(n)
+
+    if hasattr(atexit, "_clear"):
+        # Since we're in a fork we want to try and clear them
+        atexit._clear()
+        base_exit = exit
+
+        def exit(n: int) -> NoReturn:
+            atexit._run_exitfuncs()
+            base_exit(n)
+
+    try:
+        target()
+        exit(0)
+    except SystemExit as e:
+        code = 1
+        if isinstance(e.code, int):
+            code = e.code
+        elif e.code:
+            print(e.code, file=sys.stderr)
+        exit(code)
+    except Exception:
+        # Last ditch log attempt
+        exc, v, tb = sys.exc_info()
+
+        import traceback
+
+        try:
+            last_chance_stderr.write("--- Last chance exception handler ---\n")
+            traceback.print_exception(exc, value=v, tb=tb, 
file=last_chance_stderr)
+            exit(99)
+        except Exception as e:
+            with suppress(Exception):
+                print(
+                    f"--- Last chance exception handler failed --- 
{repr(str(e))}\n", file=last_chance_stderr
+                )
+            exit(98)

Review Comment:
   Oh was it? Oops, I should change those then. These are just meant to be a 
marker of "hey, something _really_ odd happened in this process"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to