kaxil commented on code in PR #43893: URL: https://github.com/apache/airflow/pull/43893#discussion_r1838589603
########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +import methodtools +import msgspec +import structlog +from uuid6 import uuid7 + +from airflow.sdk.api.datamodels._generated import ( + ConnectionResponse, + State1 as TerminalState, + TaskInstanceState, + TIEnterRunningPayload, + TITerminalStatePayload, + ValidationError as RemoteValidationError, +) +from airflow.utils.net import get_hostname +from airflow.utils.platform import getuser + +if TYPE_CHECKING: + from datetime import datetime + + +log = structlog.get_logger(logger_name=__name__) + +__all__ = [ + "Client", + "ConnectionOperations", + "ErrorBody", + "ServerResponseError", + "TaskInstanceOperations", +] + + +def get_json_error(response: httpx.Response): + """Raise a ServerResponseError if we can extract error info from the error.""" + err = ServerResponseError.from_response(response) + if err: + log.warning("Server error", detail=err.detail) + raise err + + +def raise_on_4xx_5xx(response: httpx.Response): + return get_json_error(response) or response.raise_for_status() + + +# Py 3.11+ version +def raise_on_4xx_5xx_with_note(response: httpx.Response): + try: + return get_json_error(response) or response.raise_for_status() + except httpx.HTTPStatusError as e: + if TYPE_CHECKING: + assert hasattr(e, "add_note") + e.add_note( + f"Correlation-id={response.headers.get('correlation-id', None) or response.request.headers.get('correlation-id', 'no-correlction-id')}" + ) + raise + + +if hasattr(BaseException, "add_note"): + # Py 3.11+ + raise_on_4xx_5xx = raise_on_4xx_5xx_with_note + + +def add_correlation_id(request: httpx.Request): + request.headers["correlation-id"] = str(uuid7()) + + +class TaskInstanceOperations: + __slots__ = ("client",) + + def __init__(self, client: Client): + self.client = client + + def start(self, id: uuid.UUID, pid: int, when: datetime): + """Tell the API server that this TI has started running.""" + body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), unixname=getuser(), start_date=when) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def finish(self, id: uuid.UUID, state: TaskInstanceState, when: datetime): + """Tell the API server that this TI has reached a terminal state.""" + body = TITerminalStatePayload(end_date=when, state=TerminalState(state)) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def heartbeat(self, id: uuid.UUID): + self.client.put(f"task_instance/{id}/heartbeat") + + +class ConnectionOperations: + __slots__ = ("client", "decoder") + + def __init__(self, client: Client): + self.client = client + self.decoder: msgspec.json.Decoder[ConnectionResponse] = msgspec.json.Decoder(type=ConnectionResponse) + + def get(self, id: str) -> ConnectionResponse: + """Get a connection from the API server.""" + resp = self.client.get(f"connection/{id}") + return self.decoder.decode(resp.read()) + + +class BearerAuth(httpx.Auth): + def __init__(self, token: str): + self.token: str = token + + def auth_flow(self, request: httpx.Request): + if self.token: + request.headers["Authorization"] = "Bearer " + self.token + yield request + + +def noop_handler(request: httpx.Request) -> httpx.Response: + log.debug("Dry-run request", method=request.method, path=request.url.path) + return httpx.Response(200, json={"text": "Hello, world!"}) + + +class Client(httpx.Client): + encoder: msgspec.json.Encoder + + def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any): + if (not base_url) ^ dry_run: + raise ValueError(f"Can only specify one of {base_url=} or {dry_run=}") + auth = BearerAuth(token) + + self.encoder = msgspec.json.Encoder() + if dry_run: + # If dry run is requests, install a no op handler so that simple tasks can "heartbeat" using a + # real client, but just don't make any HTTP requests + kwargs["transport"] = httpx.MockTransport(noop_handler) + kwargs["base_url"] = "dry-run://server" + else: + kwargs["base_url"] = base_url + super().__init__( + auth=auth, + headers={"airflow-api-version": "2024-07-30"}, + event_hooks={"response": [raise_on_4xx_5xx], "request": [add_correlation_id]}, Review Comment: I assume this is mainly for tracking each request with id & log ########## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ########## @@ -0,0 +1,552 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Supervise and run Tasks in a subprocess.""" + +from __future__ import annotations + +import atexit +import io +import logging +import os +import selectors +import signal +import sys +import time +import weakref +from collections.abc import Generator +from contextlib import suppress +from datetime import datetime, timezone +from socket import socket, socketpair +from typing import TYPE_CHECKING, BinaryIO, Callable, ClassVar, Literal, NoReturn, cast, overload +from uuid import UUID + +import attrs +import httpx +import msgspec +import psutil +import structlog + +from airflow.sdk.api.client import Client +from airflow.sdk.api.datamodels._generated import TaskInstanceState +from airflow.sdk.execution_time.comms import ConnectionResponse, GetConnection, StartupDetails, ToSupervisor + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + + from airflow.sdk.api.datamodels.activities import ExecuteTaskActivity + from airflow.sdk.api.datamodels.ti import TaskInstance + + +__all__ = ["WatchedSubprocess", "supervise"] + +log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor") + +# TODO: Pull this from config +SLOWEST_HEARTBEAT_INTERVAL: int = 30 +# Don't heartbeat more often than this +FASTEST_HEARTBEAT_INTERVAL: int = 5 + + +@overload +def mkpipe() -> tuple[socket, socket]: ... + + +@overload +def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ... + + +def mkpipe( + remote_read: bool = False, +) -> tuple[socket, socket | BinaryIO]: + """ + Create a pair of connected sockets. + + The inheritable flag will be set correctly so that the end destined for the subprocess is kept open but + the end for this process is closed automatically by the OS. + """ + rsock, wsock = socketpair() + local, remote = (wsock, rsock) if remote_read else (rsock, wsock) + + remote.set_inheritable(True) + local.setblocking(False) + + io: BinaryIO | socket + if remote_read: + # If _we_ are writing, we don't want to buffer + io = cast(BinaryIO, local.makefile("wb", buffering=0)) + else: + io = local + + return remote, io + + +def _subprocess_main(): + from airflow.sdk.execution_time.task_runner import main + + main() + + +def _fork_main( + child_stdin: socket, + child_stdout: socket, + child_stderr: socket, + log_fd: int, + target: Callable[[], None], +) -> NoReturn: + # TODO: Make this process a session leader + + # Uninstall the rich etc. exception handler + sys.excepthook = sys.__excepthook__ + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGUSR2, signal.SIG_DFL) + + if log_fd > 0: + # A channel that the task can send JSON-formated logs over. + # + # JSON logs sent this way will be handled nicely + from airflow.sdk.log import configure_logging + + log_io = os.fdopen(log_fd, "wb", buffering=0) + configure_logging(enable_pretty_log=False, output=log_io) + + last_chance_stderr = sys.__stderr__ or sys.stderr + + if "PYTEST_CURRENT_TEST" in os.environ: + # When we are running in pytest, it's output capturing messes us up. This works around it + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + # Ensure that sys.stdout et al (and the underlying filehandles for C libraries etc) are connected to the + # pipes form the supervisor + + for handle_name, sock, mode, close in ( + ("stdin", child_stdin, "r", True), + ("stdout", child_stdout, "w", True), + ("stderr", child_stderr, "w", False), + ): + handle = getattr(sys, handle_name) + try: + fd = handle.fileno() + os.dup2(sock.fileno(), fd) + if close: + handle.close() + except io.UnsupportedOperation: + if "PYTEST_CURRENT_TEST" in os.environ: + # When we're running under pytest, the stdin is not a real filehandle with an fd, so we need + # to handle that differently + fd = sock.fileno() + else: + raise + + setattr(sys, handle_name, os.fdopen(fd, mode)) + + def exit(n: int) -> NoReturn: + with suppress(ValueError, OSError): + sys.stdout.flush() + with suppress(ValueError, OSError): + sys.stderr.flush() + with suppress(ValueError, OSError): + last_chance_stderr.flush() + os._exit(n) + + if hasattr(atexit, "_clear"): + # Since we're in a fork we want to try and clear them + atexit._clear() + base_exit = exit + + def exit(n: int) -> NoReturn: + atexit._run_exitfuncs() + base_exit(n) + + try: + target() + exit(0) + except SystemExit as e: + code = 1 + if isinstance(e.code, int): + code = e.code + elif e.code: + print(e.code, file=sys.stderr) + exit(code) + except Exception: + # Last ditch log attempt + exc, v, tb = sys.exc_info() + + import traceback + + try: + last_chance_stderr.write("--- Last chance exception handler ---\n") + traceback.print_exception(exc, value=v, tb=tb, file=last_chance_stderr) + exit(99) + except Exception as e: + with suppress(Exception): + print( + f"--- Last chance exception handler failed --- {repr(str(e))}\n", file=last_chance_stderr + ) + exit(98) + + [email protected]() +class WatchedSubprocess: + ti_id: UUID + pid: int + + stdin: BinaryIO + stdout: socket + stderr: socket + + client: Client + + _process: psutil.Process + _exit_code: int | None = None + _terminal_state: str | None = None + + _last_heartbeat: float = 0 + + selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector) + + procs: ClassVar[weakref.WeakValueDictionary[int, WatchedSubprocess]] = weakref.WeakValueDictionary() + + def __attrs_post_init__(self): + self.procs[self.pid] = self + + @classmethod + def start( + cls, + path: str | os.PathLike[str], + ti: TaskInstance, + client: Client, + target: Callable[[], None] = _subprocess_main, + ) -> WatchedSubprocess: + """Fork and start a new subprocess to execute the given task.""" + # Create socketpairs/"pipes" to connect to the stdin and out from the subprocess + child_stdin, feed_stdin = mkpipe(remote_read=True) + child_stdout, read_stdout = mkpipe() + child_stderr, read_stderr = mkpipe() + + # Open these socketpair before forking off the child, so that it is open when we fork. + child_comms, read_msgs = mkpipe() + child_logs, read_logs = mkpipe() + + pid = os.fork() + if pid == 0: + # Parent ends of the sockets are closed by the OS as they are set as non-inheritable + + # Run the child entryoint + _fork_main(child_stdin, child_stdout, child_stderr, child_logs.fileno(), target) + + proc = cls( + ti_id=ti.id, + pid=pid, + stdin=feed_stdin, + stdout=read_stdout, + stderr=read_stderr, + process=psutil.Process(pid), + client=client, + ) + + # We've forked, but the task won't start until we send it the StartupDetails message. But before we do + # that, we need to tell the server it's started (so it has the chance to tell us "no, stop!" for any + # reason) + try: + client.task_instances.start(ti.id, pid, datetime.now(tz=timezone.utc)) + proc._last_heartbeat = time.monotonic() + except Exception: + # On any error kill that subprocess! + proc.kill(signal.SIGKILL) + raise + + # TODO: Use logging providers to handle the chunked upload for us + task_logger: FilteringBoundLogger = structlog.get_logger(logger_name="task").bind() + + cb = make_buffered_socket_reader(forward_to_log(task_logger.bind(chan="stdout"), level=logging.INFO)) + + proc.selector.register(read_stdout, selectors.EVENT_READ, cb) + cb = make_buffered_socket_reader(forward_to_log(task_logger.bind(chan="stderr"), level=logging.ERROR)) + proc.selector.register(read_stderr, selectors.EVENT_READ, cb) + + proc.selector.register( + read_logs, + selectors.EVENT_READ, + make_buffered_socket_reader(process_log_messages_from_subprocess(task_logger)), + ) + proc.selector.register( + read_msgs, + selectors.EVENT_READ, + make_buffered_socket_reader(proc.handle_requests(log=log)), + ) + + # Tell the task process what it needs to do! + msg = StartupDetails( + ti=ti, + file=str(path), + requests_fd=child_comms.fileno(), + ) + + # Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the + # other end of the pair open + child_stdout.close() + child_stdin.close() + child_comms.close() + child_logs.close() + + # Send the message to tell the process what it needs to execute + log.debug("Sending", msg=msg) + feed_stdin.write(msgspec.json.encode(msg)) + feed_stdin.write(b"\n") + + return proc + + def kill(self, signal: signal.Signals = signal.SIGINT): + if self._exit_code is not None: + return + + with suppress(ProcessLookupError): + os.kill(self.pid, signal) + + def wait(self) -> int: + if self._exit_code is not None: + return self._exit_code + + # Until we have a selector for the process, don't poll for more than 10s, just in case it exists but + # doesn't produce any output + max_poll_interval = 10 + + try: + while self._exit_code is None or len(self.selector.get_map()): + last_heartbeat_ago = time.monotonic() - self._last_heartbeat + # Monitor the task to see if it's done. Wait in a syscall (`select`) for as long as possible + # so we notice the subprocess finishing as quick as we can. + max_wait_time = max( + 0, # Make sure this value is never negative, + min( + # Ensure we heartbeat _at most_ 75% through the time the zombie threshold time Review Comment: ```suggestion # Ensure we heartbeat _at most_ 75% through the the zombie threshold time ``` ########## task_sdk/src/airflow/sdk/api/datamodels/_generated.py: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# generated by datamodel-codegen: +# filename: http://0.0.0.0:9091/execution/openapi.json +# version: 0.26.3 + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Literal + +from msgspec import Struct, field + + +class ConnectionResponse(Struct): + """ + Connection schema for responses with fields that are needed for Runtime. + """ + + conn_id: str + conn_type: str + host: str | None = None + schema_: str | None = field(name="schema", default=None) + login: str | None = None + password: str | None = None + port: int | None = None + extra: str | None = None + + +class TIEnterRunningPayload(Struct): + """ + Schema for updating TaskInstance to 'RUNNING' state with minimal required fields. + """ + + hostname: str + unixname: str + pid: int + start_date: datetime + state: Literal["running"] | None = "running" + + +class TIHeartbeatInfo(Struct): + """ + Schema for TaskInstance heartbeat endpoint. + """ + + hostname: str + pid: int + + +class State(Enum): Review Comment: This doesn't have terminal states like `SUCCESS`, `FAILED` or `SKIPPED` ########## task_sdk/src/airflow/sdk/log.py: ########## @@ -0,0 +1,377 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import itertools +import logging.config +import os +import sys +import warnings +from functools import cache +from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Generic, TextIO, TypeVar + +import msgspec +import structlog + +if TYPE_CHECKING: + from structlog.typing import EventDict, ExcInfo, Processor + + +__all__ = [ + "configure_logging", + "reset_logging", +] + + +def exception_group_tracebacks(format_exception: Callable[[ExcInfo], list[dict[str, Any]]]) -> Processor: + # Make mypy happy + if not hasattr(__builtins__, "BaseExceptionGroup"): + T = TypeVar("T") + + class BaseExceptionGroup(Generic[T]): + exceptions: list[T] + + def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if exc_info := event_dict.get("exc_info", None): + group: BaseExceptionGroup[Exception] | None = None + if exc_info is True: + # `log.exception('mesg")` case + exc_info = sys.exc_info() + if exc_info[0] is None: + exc_info = None + + if ( + isinstance(exc_info, tuple) + and len(exc_info) == 3 + and isinstance(exc_info[1], BaseExceptionGroup) + ): + group = exc_info[1] + elif isinstance(exc_info, BaseExceptionGroup): + group = exc_info + + if group: + # Only remove it from event_dict if we handle it + del event_dict["exc_info"] + event_dict["exception"] = list( + itertools.chain.from_iterable( + format_exception((type(exc), exc, exc.__traceback__)) # type: ignore[attr-defined,arg-type] + for exc in (*group.exceptions, group) + ) + ) + + return event_dict + + return _exception_group_tracebacks + + +def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if logger_name := event_dict.pop("logger_name", None): + event_dict.setdefault("logger", logger_name) + return event_dict + + +def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + for k, v in event_dict.items(): + if isinstance(v, str) and v.startswith("eyJ"): + event_dict[k] = "eyJ***" + return event_dict + + +def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + event_dict.pop("positional_args", None) + return event_dict + + +def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: + """Encode event into JSON format.""" + return msgspec.json.encode(event_dict).decode("ascii") + + +class StdBinaryStreamHandler(logging.StreamHandler): + """A logging.StreamHandler that sends logs as binary JSON over the given stream.""" + + stream: BinaryIO + + def __init__(self, stream: BinaryIO): + super().__init__(stream) + + def emit(self, record: logging.LogRecord): + try: + msg = self.format(record) + buffer = bytearray(msg, "ascii", "backslashreplace") + + buffer += b"\n" + + stream = self.stream + stream.write(buffer) + self.flush() + except RecursionError: # See issue 36272 + raise + except Exception: + self.handleError(record) + + +@cache +def logging_processors( + enable_pretty_log: bool, +): + if enable_pretty_log: + timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f") + else: + timestamper = structlog.processors.MaybeTimeStamper(fmt="iso") + + processors: list[structlog.typing.Processor] = [ + timestamper, + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + logger_name, + redact_jwt, + structlog.processors.StackInfoRenderer(), + ] + + if enable_pretty_log: + # Imports to suppress showing code from these modules + import asyncio + import contextlib + + import click + import httpcore + import httpx + import typer + + rich_exc_formatter = structlog.dev.RichTracebackFormatter( + extra_lines=0, + max_frames=30, Review Comment: Any reason for 0 and 30 here than the defaults? ########## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ########## @@ -0,0 +1,552 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Supervise and run Tasks in a subprocess.""" + +from __future__ import annotations + +import atexit +import io +import logging +import os +import selectors +import signal +import sys +import time +import weakref +from collections.abc import Generator +from contextlib import suppress +from datetime import datetime, timezone +from socket import socket, socketpair +from typing import TYPE_CHECKING, BinaryIO, Callable, ClassVar, Literal, NoReturn, cast, overload +from uuid import UUID + +import attrs +import httpx +import msgspec +import psutil +import structlog + +from airflow.sdk.api.client import Client +from airflow.sdk.api.datamodels._generated import TaskInstanceState +from airflow.sdk.execution_time.comms import ConnectionResponse, GetConnection, StartupDetails, ToSupervisor + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + + from airflow.sdk.api.datamodels.activities import ExecuteTaskActivity + from airflow.sdk.api.datamodels.ti import TaskInstance + + +__all__ = ["WatchedSubprocess", "supervise"] + +log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor") + +# TODO: Pull this from config +SLOWEST_HEARTBEAT_INTERVAL: int = 30 +# Don't heartbeat more often than this +FASTEST_HEARTBEAT_INTERVAL: int = 5 + + +@overload +def mkpipe() -> tuple[socket, socket]: ... + + +@overload +def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ... + + +def mkpipe( + remote_read: bool = False, +) -> tuple[socket, socket | BinaryIO]: + """ + Create a pair of connected sockets. + + The inheritable flag will be set correctly so that the end destined for the subprocess is kept open but + the end for this process is closed automatically by the OS. + """ + rsock, wsock = socketpair() + local, remote = (wsock, rsock) if remote_read else (rsock, wsock) + + remote.set_inheritable(True) + local.setblocking(False) + + io: BinaryIO | socket + if remote_read: + # If _we_ are writing, we don't want to buffer + io = cast(BinaryIO, local.makefile("wb", buffering=0)) + else: + io = local + + return remote, io + + +def _subprocess_main(): + from airflow.sdk.execution_time.task_runner import main + + main() + + +def _fork_main( + child_stdin: socket, + child_stdout: socket, + child_stderr: socket, + log_fd: int, + target: Callable[[], None], +) -> NoReturn: Review Comment: Worth adding docstring with the tldr of what things are done in this func ########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +import methodtools +import msgspec +import structlog +from uuid6 import uuid7 + +from airflow.sdk.api.datamodels._generated import ( + ConnectionResponse, + State1 as TerminalState, + TaskInstanceState, + TIEnterRunningPayload, + TITerminalStatePayload, + ValidationError as RemoteValidationError, +) +from airflow.utils.net import get_hostname +from airflow.utils.platform import getuser + +if TYPE_CHECKING: + from datetime import datetime + + +log = structlog.get_logger(logger_name=__name__) + +__all__ = [ + "Client", + "ConnectionOperations", + "ErrorBody", + "ServerResponseError", + "TaskInstanceOperations", +] + + +def get_json_error(response: httpx.Response): + """Raise a ServerResponseError if we can extract error info from the error.""" + err = ServerResponseError.from_response(response) + if err: + log.warning("Server error", detail=err.detail) + raise err + + +def raise_on_4xx_5xx(response: httpx.Response): + return get_json_error(response) or response.raise_for_status() + + +# Py 3.11+ version +def raise_on_4xx_5xx_with_note(response: httpx.Response): + try: + return get_json_error(response) or response.raise_for_status() + except httpx.HTTPStatusError as e: + if TYPE_CHECKING: + assert hasattr(e, "add_note") + e.add_note( + f"Correlation-id={response.headers.get('correlation-id', None) or response.request.headers.get('correlation-id', 'no-correlction-id')}" + ) + raise + + +if hasattr(BaseException, "add_note"): + # Py 3.11+ + raise_on_4xx_5xx = raise_on_4xx_5xx_with_note + + +def add_correlation_id(request: httpx.Request): + request.headers["correlation-id"] = str(uuid7()) + + +class TaskInstanceOperations: + __slots__ = ("client",) + + def __init__(self, client: Client): + self.client = client + + def start(self, id: uuid.UUID, pid: int, when: datetime): + """Tell the API server that this TI has started running.""" + body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), unixname=getuser(), start_date=when) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def finish(self, id: uuid.UUID, state: TaskInstanceState, when: datetime): + """Tell the API server that this TI has reached a terminal state.""" + body = TITerminalStatePayload(end_date=when, state=TerminalState(state)) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def heartbeat(self, id: uuid.UUID): + self.client.put(f"task_instance/{id}/heartbeat") + + +class ConnectionOperations: + __slots__ = ("client", "decoder") + + def __init__(self, client: Client): + self.client = client + self.decoder: msgspec.json.Decoder[ConnectionResponse] = msgspec.json.Decoder(type=ConnectionResponse) + + def get(self, id: str) -> ConnectionResponse: + """Get a connection from the API server.""" + resp = self.client.get(f"connection/{id}") + return self.decoder.decode(resp.read()) + + +class BearerAuth(httpx.Auth): + def __init__(self, token: str): + self.token: str = token + + def auth_flow(self, request: httpx.Request): + if self.token: + request.headers["Authorization"] = "Bearer " + self.token + yield request + + +def noop_handler(request: httpx.Request) -> httpx.Response: + log.debug("Dry-run request", method=request.method, path=request.url.path) + return httpx.Response(200, json={"text": "Hello, world!"}) + + +class Client(httpx.Client): + encoder: msgspec.json.Encoder + + def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any): + if (not base_url) ^ dry_run: + raise ValueError(f"Can only specify one of {base_url=} or {dry_run=}") + auth = BearerAuth(token) + + self.encoder = msgspec.json.Encoder() + if dry_run: + # If dry run is requests, install a no op handler so that simple tasks can "heartbeat" using a + # real client, but just don't make any HTTP requests + kwargs["transport"] = httpx.MockTransport(noop_handler) + kwargs["base_url"] = "dry-run://server" + else: + kwargs["base_url"] = base_url + super().__init__( + auth=auth, + headers={"airflow-api-version": "2024-07-30"}, + event_hooks={"response": [raise_on_4xx_5xx], "request": [add_correlation_id]}, + **kwargs, + ) + + # We "group" or "namespace" operations by what they operate on, rather than a flat namespace with all + # methods on one object prefixed with the object type (`.task_instances.update` rather than + # `task_instance_update` etc.) + + @methodtools.lru_cache() # type: ignore[misc] + @property + def task_instances(self) -> TaskInstanceOperations: + """Operations related to TaskInstances.""" + return TaskInstanceOperations(self) + + @methodtools.lru_cache() # type: ignore[misc] + @property + def connections(self) -> ConnectionOperations: + """Operations related to TaskInstances.""" + return ConnectionOperations(self) + + +class ErrorBody(msgspec.Struct): + detail: list[RemoteValidationError] | dict[str, Any] + + def __repr__(self): + return repr(self.detail) + + +class ServerResponseError(httpx.HTTPStatusError): + def __init__(self, message: str, *, request: httpx.Request, response: httpx.Response): + super().__init__(message, request=request, response=response) + + detail: ErrorBody + + @classmethod + def from_response(cls, response: httpx.Response) -> ServerResponseError | None: + if response.is_success: + return None + # 4xx or 5xx error? + if 400 < (response.status_code // 100) >= 600: + return None + + if response.headers.get("content-type") != "application/json": + return None + + try: + err = msgspec.json.decode(response.read(), type=ErrorBody) + if isinstance(err.detail, list): + msg = "Remote server returned validation error" + else: + msg = err.detail.get("message", "") or "Un-parseable error" + except Exception: + err = msgspec.json.decode(response.content) + msg = "Server returned error" + + self = cls(msg, request=response.request, response=response) + self.detail = err + return self Review Comment: Once we start adding more things, it is worth adding tests for things like this func. ########## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ########## @@ -0,0 +1,552 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Supervise and run Tasks in a subprocess.""" + +from __future__ import annotations + +import atexit +import io +import logging +import os +import selectors +import signal +import sys +import time +import weakref +from collections.abc import Generator +from contextlib import suppress +from datetime import datetime, timezone +from socket import socket, socketpair +from typing import TYPE_CHECKING, BinaryIO, Callable, ClassVar, Literal, NoReturn, cast, overload +from uuid import UUID + +import attrs +import httpx +import msgspec +import psutil +import structlog + +from airflow.sdk.api.client import Client +from airflow.sdk.api.datamodels._generated import TaskInstanceState +from airflow.sdk.execution_time.comms import ConnectionResponse, GetConnection, StartupDetails, ToSupervisor + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + + from airflow.sdk.api.datamodels.activities import ExecuteTaskActivity + from airflow.sdk.api.datamodels.ti import TaskInstance + + +__all__ = ["WatchedSubprocess", "supervise"] + +log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor") + +# TODO: Pull this from config +SLOWEST_HEARTBEAT_INTERVAL: int = 30 +# Don't heartbeat more often than this +FASTEST_HEARTBEAT_INTERVAL: int = 5 + + +@overload +def mkpipe() -> tuple[socket, socket]: ... + + +@overload +def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ... + + +def mkpipe( + remote_read: bool = False, +) -> tuple[socket, socket | BinaryIO]: + """ + Create a pair of connected sockets. + + The inheritable flag will be set correctly so that the end destined for the subprocess is kept open but + the end for this process is closed automatically by the OS. + """ + rsock, wsock = socketpair() + local, remote = (wsock, rsock) if remote_read else (rsock, wsock) + + remote.set_inheritable(True) + local.setblocking(False) + + io: BinaryIO | socket + if remote_read: + # If _we_ are writing, we don't want to buffer + io = cast(BinaryIO, local.makefile("wb", buffering=0)) + else: + io = local + + return remote, io + + +def _subprocess_main(): + from airflow.sdk.execution_time.task_runner import main + + main() + + +def _fork_main( + child_stdin: socket, + child_stdout: socket, + child_stderr: socket, + log_fd: int, + target: Callable[[], None], +) -> NoReturn: + # TODO: Make this process a session leader + + # Uninstall the rich etc. exception handler + sys.excepthook = sys.__excepthook__ + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGUSR2, signal.SIG_DFL) + + if log_fd > 0: + # A channel that the task can send JSON-formated logs over. + # + # JSON logs sent this way will be handled nicely + from airflow.sdk.log import configure_logging + + log_io = os.fdopen(log_fd, "wb", buffering=0) + configure_logging(enable_pretty_log=False, output=log_io) + + last_chance_stderr = sys.__stderr__ or sys.stderr + + if "PYTEST_CURRENT_TEST" in os.environ: + # When we are running in pytest, it's output capturing messes us up. This works around it + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + # Ensure that sys.stdout et al (and the underlying filehandles for C libraries etc) are connected to the + # pipes form the supervisor + + for handle_name, sock, mode, close in ( + ("stdin", child_stdin, "r", True), + ("stdout", child_stdout, "w", True), + ("stderr", child_stderr, "w", False), + ): + handle = getattr(sys, handle_name) + try: + fd = handle.fileno() + os.dup2(sock.fileno(), fd) + if close: + handle.close() + except io.UnsupportedOperation: + if "PYTEST_CURRENT_TEST" in os.environ: + # When we're running under pytest, the stdin is not a real filehandle with an fd, so we need + # to handle that differently + fd = sock.fileno() + else: + raise + + setattr(sys, handle_name, os.fdopen(fd, mode)) + + def exit(n: int) -> NoReturn: + with suppress(ValueError, OSError): + sys.stdout.flush() + with suppress(ValueError, OSError): + sys.stderr.flush() + with suppress(ValueError, OSError): + last_chance_stderr.flush() + os._exit(n) + + if hasattr(atexit, "_clear"): + # Since we're in a fork we want to try and clear them + atexit._clear() + base_exit = exit + + def exit(n: int) -> NoReturn: + atexit._run_exitfuncs() + base_exit(n) + + try: + target() + exit(0) + except SystemExit as e: + code = 1 + if isinstance(e.code, int): + code = e.code + elif e.code: + print(e.code, file=sys.stderr) + exit(code) + except Exception: + # Last ditch log attempt + exc, v, tb = sys.exc_info() + + import traceback + + try: + last_chance_stderr.write("--- Last chance exception handler ---\n") + traceback.print_exception(exc, value=v, tb=tb, file=last_chance_stderr) + exit(99) + except Exception as e: + with suppress(Exception): + print( + f"--- Last chance exception handler failed --- {repr(str(e))}\n", file=last_chance_stderr + ) + exit(98) Review Comment: Does exit code `98` and `99` represent something? or we use it to handle the logic somewhere? ########## task_sdk/tests/execution_time/test_supervisor.py: ########## @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import inspect +import logging +import os +import signal +import sys +from unittest.mock import MagicMock + +import pytest +import structlog +import structlog.testing + +import airflow.sdk.api.client Review Comment: ```suggestion from airflow.sdk.api import client ``` ########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +import methodtools +import msgspec +import structlog +from uuid6 import uuid7 + +from airflow.sdk.api.datamodels._generated import ( + ConnectionResponse, + State1 as TerminalState, + TaskInstanceState, + TIEnterRunningPayload, + TITerminalStatePayload, + ValidationError as RemoteValidationError, +) +from airflow.utils.net import get_hostname +from airflow.utils.platform import getuser + +if TYPE_CHECKING: + from datetime import datetime + + +log = structlog.get_logger(logger_name=__name__) + +__all__ = [ + "Client", + "ConnectionOperations", + "ErrorBody", + "ServerResponseError", + "TaskInstanceOperations", +] + + +def get_json_error(response: httpx.Response): + """Raise a ServerResponseError if we can extract error info from the error.""" + err = ServerResponseError.from_response(response) + if err: + log.warning("Server error", detail=err.detail) + raise err + + +def raise_on_4xx_5xx(response: httpx.Response): + return get_json_error(response) or response.raise_for_status() + + +# Py 3.11+ version +def raise_on_4xx_5xx_with_note(response: httpx.Response): + try: + return get_json_error(response) or response.raise_for_status() + except httpx.HTTPStatusError as e: + if TYPE_CHECKING: + assert hasattr(e, "add_note") + e.add_note( + f"Correlation-id={response.headers.get('correlation-id', None) or response.request.headers.get('correlation-id', 'no-correlction-id')}" + ) + raise + + +if hasattr(BaseException, "add_note"): + # Py 3.11+ + raise_on_4xx_5xx = raise_on_4xx_5xx_with_note + + +def add_correlation_id(request: httpx.Request): + request.headers["correlation-id"] = str(uuid7()) + + +class TaskInstanceOperations: + __slots__ = ("client",) + + def __init__(self, client: Client): + self.client = client + + def start(self, id: uuid.UUID, pid: int, when: datetime): + """Tell the API server that this TI has started running.""" + body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), unixname=getuser(), start_date=when) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def finish(self, id: uuid.UUID, state: TaskInstanceState, when: datetime): + """Tell the API server that this TI has reached a terminal state.""" + body = TITerminalStatePayload(end_date=when, state=TerminalState(state)) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def heartbeat(self, id: uuid.UUID): + self.client.put(f"task_instance/{id}/heartbeat") + + +class ConnectionOperations: + __slots__ = ("client", "decoder") + + def __init__(self, client: Client): + self.client = client + self.decoder: msgspec.json.Decoder[ConnectionResponse] = msgspec.json.Decoder(type=ConnectionResponse) + + def get(self, id: str) -> ConnectionResponse: + """Get a connection from the API server.""" + resp = self.client.get(f"connection/{id}") + return self.decoder.decode(resp.read()) + + +class BearerAuth(httpx.Auth): + def __init__(self, token: str): + self.token: str = token + + def auth_flow(self, request: httpx.Request): + if self.token: + request.headers["Authorization"] = "Bearer " + self.token + yield request + + +def noop_handler(request: httpx.Request) -> httpx.Response: + log.debug("Dry-run request", method=request.method, path=request.url.path) + return httpx.Response(200, json={"text": "Hello, world!"}) + + +class Client(httpx.Client): + encoder: msgspec.json.Encoder + + def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any): + if (not base_url) ^ dry_run: + raise ValueError(f"Can only specify one of {base_url=} or {dry_run=}") + auth = BearerAuth(token) + + self.encoder = msgspec.json.Encoder() + if dry_run: + # If dry run is requests, install a no op handler so that simple tasks can "heartbeat" using a + # real client, but just don't make any HTTP requests + kwargs["transport"] = httpx.MockTransport(noop_handler) + kwargs["base_url"] = "dry-run://server" + else: + kwargs["base_url"] = base_url + super().__init__( + auth=auth, + headers={"airflow-api-version": "2024-07-30"}, Review Comment: For PR reviewers: To be handled in a separate PR ########## task_sdk/src/airflow/sdk/api/datamodels/_generated.py: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# generated by datamodel-codegen: +# filename: http://0.0.0.0:9091/execution/openapi.json +# version: 0.26.3 + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Literal + +from msgspec import Struct, field + + +class ConnectionResponse(Struct): + """ + Connection schema for responses with fields that are needed for Runtime. + """ + + conn_id: str + conn_type: str + host: str | None = None + schema_: str | None = field(name="schema", default=None) + login: str | None = None + password: str | None = None + port: int | None = None + extra: str | None = None + + +class TIEnterRunningPayload(Struct): + """ + Schema for updating TaskInstance to 'RUNNING' state with minimal required fields. + """ + + hostname: str + unixname: str + pid: int + start_date: datetime + state: Literal["running"] | None = "running" + + +class TIHeartbeatInfo(Struct): + """ + Schema for TaskInstance heartbeat endpoint. + """ + + hostname: str + pid: int + + +class State(Enum): + REMOVED = "removed" + SCHEDULED = "scheduled" + QUEUED = "queued" + RUNNING = "running" + RESTARTING = "restarting" + UP_FOR_RETRY = "up_for_retry" + UP_FOR_RESCHEDULE = "up_for_reschedule" + UPSTREAM_FAILED = "upstream_failed" + DEFERRED = "deferred" + + +class TITargetStatePayload(Struct): + """ + Schema for updating TaskInstance to a target state, excluding terminal and running states. + """ + + state: State + + +class State1(Enum): Review Comment: Yeah, I will fix it in a separate PR ########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +import methodtools +import msgspec +import structlog +from uuid6 import uuid7 + +from airflow.sdk.api.datamodels._generated import ( + ConnectionResponse, + State1 as TerminalState, + TaskInstanceState, + TIEnterRunningPayload, + TITerminalStatePayload, + ValidationError as RemoteValidationError, +) +from airflow.utils.net import get_hostname +from airflow.utils.platform import getuser + +if TYPE_CHECKING: + from datetime import datetime + + +log = structlog.get_logger(logger_name=__name__) + +__all__ = [ + "Client", + "ConnectionOperations", + "ErrorBody", + "ServerResponseError", + "TaskInstanceOperations", +] + + +def get_json_error(response: httpx.Response): + """Raise a ServerResponseError if we can extract error info from the error.""" + err = ServerResponseError.from_response(response) + if err: + log.warning("Server error", detail=err.detail) + raise err + + +def raise_on_4xx_5xx(response: httpx.Response): + return get_json_error(response) or response.raise_for_status() + + +# Py 3.11+ version +def raise_on_4xx_5xx_with_note(response: httpx.Response): + try: + return get_json_error(response) or response.raise_for_status() + except httpx.HTTPStatusError as e: + if TYPE_CHECKING: + assert hasattr(e, "add_note") + e.add_note( + f"Correlation-id={response.headers.get('correlation-id', None) or response.request.headers.get('correlation-id', 'no-correlction-id')}" + ) + raise + + +if hasattr(BaseException, "add_note"): + # Py 3.11+ + raise_on_4xx_5xx = raise_on_4xx_5xx_with_note + + +def add_correlation_id(request: httpx.Request): + request.headers["correlation-id"] = str(uuid7()) + + +class TaskInstanceOperations: + __slots__ = ("client",) + + def __init__(self, client: Client): + self.client = client + + def start(self, id: uuid.UUID, pid: int, when: datetime): + """Tell the API server that this TI has started running.""" + body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), unixname=getuser(), start_date=when) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) Review Comment: I changed this recently to `task-instance` but we can handle it in a separate PR ########## task_sdk/tests/execution_time/test_supervisor.py: ########## @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import inspect +import logging +import os +import signal +import sys +from unittest.mock import MagicMock + +import pytest +import structlog +import structlog.testing + +import airflow.sdk.api.client Review Comment: or was it on purpose? ########## task_sdk/src/airflow/sdk/log.py: ########## @@ -0,0 +1,377 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import itertools +import logging.config +import os +import sys +import warnings +from functools import cache +from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Generic, TextIO, TypeVar + +import msgspec +import structlog + +if TYPE_CHECKING: + from structlog.typing import EventDict, ExcInfo, Processor + + +__all__ = [ + "configure_logging", + "reset_logging", +] + + +def exception_group_tracebacks(format_exception: Callable[[ExcInfo], list[dict[str, Any]]]) -> Processor: + # Make mypy happy + if not hasattr(__builtins__, "BaseExceptionGroup"): + T = TypeVar("T") + + class BaseExceptionGroup(Generic[T]): + exceptions: list[T] + + def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if exc_info := event_dict.get("exc_info", None): + group: BaseExceptionGroup[Exception] | None = None + if exc_info is True: + # `log.exception('mesg")` case + exc_info = sys.exc_info() + if exc_info[0] is None: + exc_info = None + + if ( + isinstance(exc_info, tuple) + and len(exc_info) == 3 + and isinstance(exc_info[1], BaseExceptionGroup) + ): + group = exc_info[1] + elif isinstance(exc_info, BaseExceptionGroup): + group = exc_info + + if group: + # Only remove it from event_dict if we handle it + del event_dict["exc_info"] + event_dict["exception"] = list( + itertools.chain.from_iterable( + format_exception((type(exc), exc, exc.__traceback__)) # type: ignore[attr-defined,arg-type] + for exc in (*group.exceptions, group) + ) + ) + + return event_dict + + return _exception_group_tracebacks + + +def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if logger_name := event_dict.pop("logger_name", None): + event_dict.setdefault("logger", logger_name) + return event_dict + + +def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + for k, v in event_dict.items(): + if isinstance(v, str) and v.startswith("eyJ"): + event_dict[k] = "eyJ***" + return event_dict + + +def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + event_dict.pop("positional_args", None) + return event_dict + + +def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: + """Encode event into JSON format.""" + return msgspec.json.encode(event_dict).decode("ascii") + + +class StdBinaryStreamHandler(logging.StreamHandler): + """A logging.StreamHandler that sends logs as binary JSON over the given stream.""" + + stream: BinaryIO + + def __init__(self, stream: BinaryIO): + super().__init__(stream) + + def emit(self, record: logging.LogRecord): + try: + msg = self.format(record) + buffer = bytearray(msg, "ascii", "backslashreplace") + + buffer += b"\n" + + stream = self.stream + stream.write(buffer) + self.flush() + except RecursionError: # See issue 36272 + raise + except Exception: + self.handleError(record) + + +@cache +def logging_processors( + enable_pretty_log: bool, +): + if enable_pretty_log: + timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f") + else: + timestamper = structlog.processors.MaybeTimeStamper(fmt="iso") + + processors: list[structlog.typing.Processor] = [ + timestamper, + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + logger_name, + redact_jwt, + structlog.processors.StackInfoRenderer(), + ] + + if enable_pretty_log: + # Imports to suppress showing code from these modules + import asyncio + import contextlib + + import click + import httpcore + import httpx + import typer + + rich_exc_formatter = structlog.dev.RichTracebackFormatter( + extra_lines=0, + max_frames=30, + indent_guides=False, + suppress=[asyncio, httpcore, httpx, contextlib, click, typer], + ) + my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles() + my_styles["debug"] = structlog.dev.CYAN + + console = structlog.dev.ConsoleRenderer( + exception_formatter=rich_exc_formatter, level_styles=my_styles + ) + processors.append(console) + return processors, { + "timestamper": timestamper, + "console": console, + } + else: + # Imports to suppress showing code from these modules + import asyncio + import contextlib + + import click + import httpcore + import httpx + import typer + + dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer( + use_rich=False, show_locals=False, suppress=(click, typer) + ) + + dict_tracebacks = structlog.processors.ExceptionRenderer( + structlog.tracebacks.ExceptionDictTransformer( + use_rich=False, show_locals=False, suppress=(click, typer) + ) + ) + if hasattr(__builtins__, "BaseExceptionGroup"): + exc_group_processor = exception_group_tracebacks(dict_exc_formatter) + processors.append(exc_group_processor) + else: + exc_group_processor = None + + encoder = msgspec.json.Encoder() + + def json_dumps(msg, default): + return encoder.encode(msg) + + def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: + # import web_pdb + + # web_pdb.set_trace() Review Comment: ```suggestion ``` ########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +import httpx +import methodtools +import msgspec +import structlog +from uuid6 import uuid7 + +from airflow.sdk.api.datamodels._generated import ( + ConnectionResponse, + State1 as TerminalState, + TaskInstanceState, + TIEnterRunningPayload, + TITerminalStatePayload, + ValidationError as RemoteValidationError, +) +from airflow.utils.net import get_hostname +from airflow.utils.platform import getuser + +if TYPE_CHECKING: + from datetime import datetime + + +log = structlog.get_logger(logger_name=__name__) + +__all__ = [ + "Client", + "ConnectionOperations", + "ErrorBody", + "ServerResponseError", + "TaskInstanceOperations", +] + + +def get_json_error(response: httpx.Response): + """Raise a ServerResponseError if we can extract error info from the error.""" + err = ServerResponseError.from_response(response) + if err: + log.warning("Server error", detail=err.detail) + raise err + + +def raise_on_4xx_5xx(response: httpx.Response): + return get_json_error(response) or response.raise_for_status() + + +# Py 3.11+ version +def raise_on_4xx_5xx_with_note(response: httpx.Response): + try: + return get_json_error(response) or response.raise_for_status() + except httpx.HTTPStatusError as e: + if TYPE_CHECKING: + assert hasattr(e, "add_note") + e.add_note( + f"Correlation-id={response.headers.get('correlation-id', None) or response.request.headers.get('correlation-id', 'no-correlction-id')}" + ) + raise + + +if hasattr(BaseException, "add_note"): + # Py 3.11+ + raise_on_4xx_5xx = raise_on_4xx_5xx_with_note + + +def add_correlation_id(request: httpx.Request): + request.headers["correlation-id"] = str(uuid7()) + + +class TaskInstanceOperations: + __slots__ = ("client",) + + def __init__(self, client: Client): + self.client = client + + def start(self, id: uuid.UUID, pid: int, when: datetime): + """Tell the API server that this TI has started running.""" + body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), unixname=getuser(), start_date=when) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def finish(self, id: uuid.UUID, state: TaskInstanceState, when: datetime): + """Tell the API server that this TI has reached a terminal state.""" + body = TITerminalStatePayload(end_date=when, state=TerminalState(state)) + + self.client.patch(f"task_instance/{id}/state", content=self.client.encoder.encode(body)) + + def heartbeat(self, id: uuid.UUID): + self.client.put(f"task_instance/{id}/heartbeat") + + +class ConnectionOperations: + __slots__ = ("client", "decoder") + + def __init__(self, client: Client): + self.client = client + self.decoder: msgspec.json.Decoder[ConnectionResponse] = msgspec.json.Decoder(type=ConnectionResponse) + + def get(self, id: str) -> ConnectionResponse: + """Get a connection from the API server.""" + resp = self.client.get(f"connection/{id}") + return self.decoder.decode(resp.read()) + + +class BearerAuth(httpx.Auth): + def __init__(self, token: str): + self.token: str = token + + def auth_flow(self, request: httpx.Request): + if self.token: + request.headers["Authorization"] = "Bearer " + self.token + yield request + + +def noop_handler(request: httpx.Request) -> httpx.Response: + log.debug("Dry-run request", method=request.method, path=request.url.path) + return httpx.Response(200, json={"text": "Hello, world!"}) + + +class Client(httpx.Client): + encoder: msgspec.json.Encoder + + def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any): + if (not base_url) ^ dry_run: + raise ValueError(f"Can only specify one of {base_url=} or {dry_run=}") + auth = BearerAuth(token) + + self.encoder = msgspec.json.Encoder() + if dry_run: + # If dry run is requests, install a no op handler so that simple tasks can "heartbeat" using a + # real client, but just don't make any HTTP requests + kwargs["transport"] = httpx.MockTransport(noop_handler) + kwargs["base_url"] = "dry-run://server" + else: + kwargs["base_url"] = base_url + super().__init__( + auth=auth, + headers={"airflow-api-version": "2024-07-30"}, + event_hooks={"response": [raise_on_4xx_5xx], "request": [add_correlation_id]}, + **kwargs, + ) + + # We "group" or "namespace" operations by what they operate on, rather than a flat namespace with all + # methods on one object prefixed with the object type (`.task_instances.update` rather than + # `task_instance_update` etc.) + + @methodtools.lru_cache() # type: ignore[misc] + @property + def task_instances(self) -> TaskInstanceOperations: + """Operations related to TaskInstances.""" + return TaskInstanceOperations(self) + + @methodtools.lru_cache() # type: ignore[misc] + @property + def connections(self) -> ConnectionOperations: + """Operations related to TaskInstances.""" + return ConnectionOperations(self) + + +class ErrorBody(msgspec.Struct): + detail: list[RemoteValidationError] | dict[str, Any] + + def __repr__(self): + return repr(self.detail) + + +class ServerResponseError(httpx.HTTPStatusError): + def __init__(self, message: str, *, request: httpx.Request, response: httpx.Response): + super().__init__(message, request=request, response=response) + + detail: ErrorBody + + @classmethod + def from_response(cls, response: httpx.Response) -> ServerResponseError | None: + if response.is_success: + return None + # 4xx or 5xx error? + if 400 < (response.status_code // 100) >= 600: + return None + + if response.headers.get("content-type") != "application/json": + return None + + try: + err = msgspec.json.decode(response.read(), type=ErrorBody) + if isinstance(err.detail, list): + msg = "Remote server returned validation error" + else: + msg = err.detail.get("message", "") or "Un-parseable error" + except Exception: + err = msgspec.json.decode(response.content) + msg = "Server returned error" + + self = cls(msg, request=response.request, response=response) + self.detail = err + return self Review Comment: (in a separate pr) ########## task_sdk/src/airflow/sdk/api/datamodels/ti.py: ########## @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import uuid +from typing import Optional + +import msgspec + + +class TaskInstance(msgspec.Struct, omit_defaults=True): + id: uuid.UUID + + task_id: str + dag_id: str + run_id: str + try_number: int + map_index: Optional[int] = None # noqa: UP007 msgspec doesn't support pipe-op Review Comment: Should we default it to `-1`, or not make any such assumptions in the API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
