https://gcc.gnu.org/g:7ee67c1388ba3830a84748a9f586f65569869c0a
commit r16-7667-g7ee67c1388ba3830a84748a9f586f65569869c0a Author: Piyush Raj <[email protected]> Date: Wed Feb 25 01:47:53 2026 +0530 contrib: add bpf-vmtest-tool to test BPF programs This patch adds the bpf-vmtest-tool subdirectory under contrib which tests BPF programs under a live kernel using a QEMU VM. It can build the specified kernel version with eBPF support enabled and stores it under $VMTEST_DIR It can test BPF C source files and precompiled BPF object files against the kernel verifier for errors. When a BPF program is rejected by the kernel verifier, the verifier logs are displayed. $ python3 main.py --log-level ERROR vmtest -k 6.15 --bpf-src fail.c BPF program failed to load Verifier logs: btf_vmlinux is malformed 0: R1=ctx() R10=fp0 0: (81) r0 = *(s32 *)(r10 +4) invalid read from stack R10 off=4 size=4 processed 1 insns (limit 1000000) max_states_per_insn 0 total_states 0 peak_states 0 mark_read 0 See the README for more examples. The script uses vmtest (https://github.com/danobi/vmtest) to boot the VM and run the program. By default, it uses the host's root ("/") as the VM rootfs via the 9p filesystem, so only the kernel is replaced during testing. Tested with Python 3.9 and above. contrib/ChangeLog: * bpf-vmtest-tool/README: New file. * bpf-vmtest-tool/bpf.py: New file. * bpf-vmtest-tool/config.py: New file. * bpf-vmtest-tool/kernel.py: New file. * bpf-vmtest-tool/main.py: New file. * bpf-vmtest-tool/pyproject.toml: New file. * bpf-vmtest-tool/tests/test_cli.py: New file. * bpf-vmtest-tool/utils.py: New file. * bpf-vmtest-tool/vm.py: New file. Signed-off-by: Piyush Raj <[email protected]> Diff: --- contrib/bpf-vmtest-tool/README | 248 ++++++++++++++++++++++++ contrib/bpf-vmtest-tool/bpf.py | 218 ++++++++++++++++++++++ contrib/bpf-vmtest-tool/config.py | 50 +++++ contrib/bpf-vmtest-tool/kernel.py | 301 ++++++++++++++++++++++++++++++ contrib/bpf-vmtest-tool/main.py | 285 ++++++++++++++++++++++++++++ contrib/bpf-vmtest-tool/pyproject.toml | 36 ++++ contrib/bpf-vmtest-tool/tests/test_cli.py | 219 ++++++++++++++++++++++ contrib/bpf-vmtest-tool/utils.py | 31 +++ contrib/bpf-vmtest-tool/vm.py | 175 +++++++++++++++++ 9 files changed, 1563 insertions(+) diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README new file mode 100644 index 000000000000..d5648f59f6f3 --- /dev/null +++ b/contrib/bpf-vmtest-tool/README @@ -0,0 +1,248 @@ +BPF vmtest Tool +=============== +https://gcc.gnu.org/wiki/BPFRunTimeTests + +This directory contains a Python script for running BPF programs or shell commands +under a live Linux kernel using QEMU virtual machines. + + +USAGE +===== + +Before using the tool, you must set the directory where vmtest will look for +kernels and store kernel artifacts. You can do this in two ways: + +1. Set the VMTEST_DIR environment variable +2. Use the --vmtest-dir flag with each command + +Note: This is required to use the tool. + + +Available Options +----------------- + + usage: main.py [-h] [-v DEBUG|INFO|WARNING|ERROR] [--vmtest-dir DIR] {bpf,vmtest,kernel} ... + + BPF vmtest tool + + positional arguments: + {bpf,vmtest,kernel} Available commands + bpf BPF program management + vmtest Run VM tests + kernel Kernel management + + options: + -h, --help show this help message and exit + -v DEBUG|INFO|WARNING|ERROR, --log-level DEBUG|INFO|WARNING|ERROR + Log level + --vmtest-dir DIR Directory for vmtest artifacts (or set VMTEST_DIR env variable) + +COMMANDS +======== + +kernel subcommand +----------------- + +Manage kernel builds for use in virtual machines. You must build a +kernel before using it. + +Build a kernel: + + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel build 6.15 + + The tool downloads and builds the specified kernel version from + https://www.kernel.org/pub/linux/kernel and stores the build artifacts in + $VMTEST_DIR/kernels/linux-6.15-x86_64. Specifically, it stores bpftool, + libbpf.a, bzImage-6.15-x86_64, and vmlinux.h, which are used when compiling + BPF programs instead of relying on the host system. + +List available kernels: + + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel list + +Remove kernels: + + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel remove 6.15-x86_64 + +Note: If the architecture is omitted, the host architecture is assumed. For +example, 6.15 is treated as 6.15-x86_64 on an x86_64 system. + + +vmtest subcommand +----------------- + +Run BPF programs and commands inside a QEMU virtual machine with a live kernel. + +Options: + + -k VERSION, --kernel VERSION + Kernel version to boot in the VM. Must be a kernel previously built using + the "kernel build" subcommand. The kernel version should match the format + used during build (e.g., 6.15-x86_64 or just 6.15 for host architecture). + + Required: Yes + + -r PATH, --rootfs PATH + Path to a root filesystem directory to mount in the VM. + If not specified, the host's root filesystem (/) is mounted by default. + + Required: No + Default: / (host root filesystem) + + See "Creating a Custom Rootfs" section below for how to build a rootfs from + a container image. + + --bpf-src PATH + BPF C source file to compile and load. Mutually exclusive with --bpf-obj. + + --bpf-obj PATH + Pre-compiled BPF object to load. Mutually exclusive with --bpf-src. + + -c COMMAND, --command COMMAND + Shell command to run in VM. + +At least one of --bpf-src, --bpf-obj, or --command is required. + + + +Examples: + + Run a shell command inside a live kernel VM: + + python main.py vmtest -k 6.15 -c "uname -a" + + Load and run a BPF source file with custom rootfs: + + python main.py vmtest -k 6.15 --bpf-src fail.c -r $HOME/rootfs/debian-rootfs + + The tool compiles the source file using BPF_CC with BPF_CFLAGS and the + kernel-specific vmlinux.h, then generates a skeleton from the compiled BPF + object using bpftool and compiles it with a generated loader to produce + the final userspace binary. + + Load a precompiled BPF object file: + + python main.py vmtest -k 6.15 --bpf-obj fail.bpf.o + + The tool follows the same steps to generate the final userspace binary, except + it skips the BPF object compilation step. + + + +Creating a Custom Rootfs: + + You can build a rootfs from a container image using this script: + + #!/usr/bin/env bash + set -euo pipefail + + ROOT="$HOME/rootfs" + IMAGE_NAME="debian-bookworm-rootfs" + CTR_NAME="debian-rootfs-ctr" + + cat <<'EOF' | podman build -t "$IMAGE_NAME" - + FROM debian:bookworm + RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + libelf1 \ + qemu-guest-agent \ + && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + EOF + + CTR_ID=$(podman create --name "$CTR_NAME" "$IMAGE_NAME") + podman export -o "$ROOT/deb.tar" "$CTR_ID" + tar -xf "$ROOT/deb.tar" -C "$ROOT/debian-rootfs" + podman rm "$CTR_NAME" + echo "Debian rootfs ready at: $ROOT/debian-rootfs" + + After running this script, use the rootfs with: + + python main.py vmtest -k 6.15 -r $HOME/rootfs/debian-rootfs -c "uname -a" + + +bpf subcommand +-------------- + +Compile BPF source code to bytecode using kernel-specific headers. + +Options: + + compile + Compile a BPF C source file to a BPF object file. + + -k VERSION, --kernel VERSION + Kernel version to use for compilation. The tool will use the vmlinux.h + header from this kernel for the compilation. + + Required: Yes + + -o OUTPUT, --output OUTPUT + Output path for the compiled BPF object file. + + Required: Yes + +Example: + + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" bpf compile \ + invalid-memory-access.c -k 6.15 -o /tmp/invalid-memory-access.bpf.o + +The compilation stage can be modified using the BPF_CFLAGS environment variable +(default: -c) + + Example : + + BPF_CFLAGS="-O2 -E" python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" \ + bpf compile invalid-memory-access.c -k 6.15 -o /tmp/invalid-memory-access.bpf.o + +LIMITATIONS +=========== + +- Only x86_64 architecture is currently supported + + +DEPENDENCIES +============ + +- Python >= 3.9 +- vmtest >= v0.18.0 (https://github.com/danobi/vmtest) + - QEMU + - qemu-guest-agent (on guest filesystem) + +For compiling kernels: +- pahole +- See https://docs.kernel.org/process/changes.html#current-minimal-requirements + +For compiling and loading BPF programs: +- libbpf +- gcc-bpf-unknown-none (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF) + + +BUILD FLAGS +=========== + +You can customize compiler settings using environment variables: + +- BPF_CC: Compiler for the BPF program (default: bpf-unknown-none-gcc) +- BPF_CFLAGS: Extra flags for BPF program compilation (default: "-O2 -Wall -Werror -c") +- BPF_INCLUDES: Include paths for BPF (default: "-I/usr/local/include -I/usr/include") +- VMTEST_CC: Compiler for the user-space loader (default: gcc) +- VMTEST_CFLAGS: Flags for compiling the loader (default: "-g -Wall -Werror") +- VMTEST_LDFLAGS: Linker flags for the loader (default: "-lelf -lz") + +Example usage: + + BPF_CFLAGS="-O3 -g" BPF_CC="/bpf-gcc-build/gcc/xgcc" \ + python main.py vmtest -k 6.15 --bpf-src fail.c + + +DEVELOPMENT +=========== + +Development dependencies are specified in pyproject.toml and can be installed +using any suitable Python virtual environment manager. + +To run the test suite: + + python3 -m pytest diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py new file mode 100644 index 000000000000..e3a98d78900c --- /dev/null +++ b/contrib/bpf-vmtest-tool/bpf.py @@ -0,0 +1,218 @@ +import re +import subprocess +import logging +from pathlib import Path +import sys +import tempfile +from typing import Optional +import utils +import config +import os + +# Based on the compilation process described in: +# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile + +logger = logging.getLogger(__name__) + + +def generate_sanitized_name(path: Path): + """generate sanitized c variable name""" + name = re.sub(r"\W", "_", path.stem) + if name and name[0].isdigit(): + name = "_" + name + return name + + +class BPFProgram: + tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest-") + tmp_base_dir_path = Path(tmp_base_dir.name) + + def __init__( + self, + source_path: Optional[Path] = None, + bpf_bytecode_path: Optional[Path] = None, + use_temp_dir: bool = False, + ): + path = source_path or bpf_bytecode_path + self.name = generate_sanitized_name(path) + self.build_dir = self.__class__.tmp_base_dir_path / "ebpf_programs" / self.name + + if source_path: + self.bpf_src = source_path + self.bpf_obj = self.build_dir / f"{self.name}.bpf.o" + else: + self.bpf_obj = bpf_bytecode_path + self.build_dir.mkdir(parents=True, exist_ok=True) + self.bpf_skel = self.build_dir / f"{self.name}.skel.h" + self.loader_src = self.build_dir / f"{self.name}-loader.c" + self.output = self.build_dir / f"{self.name}.bin" + + @classmethod + def from_source(cls, source_path: Path, kernel_spec): + self = cls(source_path=source_path) + self._compile_bpf(kernel_spec) + self._compile_from_bpf_bytecode(kernel_spec) + return self.output + + @classmethod + def from_bpf_obj(cls, obj_path: Path, kernel_spec): + self = cls(bpf_bytecode_path=obj_path) + self._compile_from_bpf_bytecode(kernel_spec) + return self.output + + def compile_bpf(self, kernel_spec) -> Path: + if self.bpf_src is None: + raise ValueError( + "Cannot compile BPF source: instance was created with " + "bpf_bytecode_path instead of source_path" + ) + self._compile_bpf(kernel_spec) + return self.bpf_obj + + def _compile_from_bpf_bytecode(self, kernel_spec): + self._generate_skeleton(kernel_spec) + self._compile_loader(kernel_spec) + + def _compile_bpf(self, kernel_spec): + """Compile the eBPF program using gcc""" + logger.info(f"Compiling eBPF source: {self.bpf_src}") + cmd = [ + config.config.bpf_cc, + f"-D__TARGET_ARCH_{config.config.arch}", + "-gbtf", + "-std=gnu17", + ] + cmd.append(f"-I{kernel_spec.vmlinux_path.parent}") + cmd.extend(config.config.bpf_cflags.split(" ")) + cmd.extend(config.config.bpf_includes.split(" ")) + cmd.extend( + [ + str(self.bpf_src), + "-o", + str(self.bpf_obj), + ] + ) + logger.debug("".join(cmd)) + try: + utils.run_command(cmd, stream_output=True) + except subprocess.CalledProcessError as e: + logger.error(f"bpf compilation failed: {e}") + sys.exit(1) + logger.info(f"eBPF compiled: {self.bpf_obj}") + + def _generate_skeleton(self, kernel_spec): + """Generate the BPF skeleton header using bpftool""" + logger.info(f"Generating skeleton: {self.bpf_skel}") + cmd = [ + kernel_spec.bpftool_path, + "gen", + "skeleton", + str(self.bpf_obj), + "name", + self.name, + ] + try: + result = utils.run_command(cmd) + with open(self.bpf_skel, "w") as f: + f.write(result.stdout) + logger.info("Skeleton generated.") + except subprocess.CalledProcessError: + logger.error("Failed to generate skeleton.") + sys.exit(1) + + def _compile_loader(self, kernel_spec): + """Compile the C loader program""" + self._generate_loader() + logger.info(f"Compiling loader: {self.loader_src}") + cmd = [ + config.config.vmtest_cc, + *config.config.vmtest_cflags.split(" "), + "-I", + str(self.build_dir), + str(self.loader_src), + kernel_spec.libbpf_path, + *config.config.vmtest_ldflags.split(" "), + "-o", + str(self.output), + ] + # remove variables that conflict with host compiler + clean_env = os.environ.copy() + clean_env.pop("GCC_EXEC_PREFIX", None) + try: + utils.run_command(cmd, env=clean_env, stream_output=True) + except subprocess.CalledProcessError as e: + logger.error(f"bpf loader compilation failed: {e}") + sys.exit(1) + + logger.info("Compilation complete") + + def _generate_loader(self): + """ + Generate a loader C file for the given BPF skeleton. + + Args: + bpf_name (str): Name of the BPF program (e.g. "prog"). + output_path (str): Path to write loader.c. + """ + skeleton_header = f"{self.name}.skel.h" + loader_code = f"""\ + #include <stdio.h> + #include <stdlib.h> + #include <signal.h> + #include <unistd.h> + #include <bpf/libbpf.h> + #include "{skeleton_header}" + + #define LOG_BUF_SIZE 1024 * 1024 + + static volatile sig_atomic_t stop; + static char log_buf[LOG_BUF_SIZE]; + + void handle_sigint(int sig) {{ + stop = 1; + }} + + int main() {{ + struct {self.name} *skel; + struct bpf_program *prog; + int err; + + signal(SIGINT, handle_sigint); + + skel = {self.name}__open(); // STEP 1: open only + if (!skel) {{ + fprintf(stderr, "Failed to open BPF skeleton\\n"); + return 1; + }} + + // STEP 2: Get the bpf_program object for the main program + bpf_object__for_each_program(prog, skel->obj) {{ + bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf)); + bpf_program__set_log_level(prog, 1); // optional: verbose logs + }} + + // STEP 3: Load the program (this will trigger verifier log output) + err = {self.name}__load(skel); + fprintf( + stderr, + "--- Verifier log start ---\\n" + "%s\\n" + "--- Verifier log end ---\\n", + log_buf + ); + if (err) {{ + fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err); + {self.name}__destroy(skel); + return 1; + }} + + printf("BPF program loaded successfully.\\n"); + + {self.name}__destroy(skel); + return 0; + }} + + """ + with open(self.loader_src, "w") as f: + f.write(loader_code) + logger.info(f"Generated loader at {self.loader_src}") diff --git a/contrib/bpf-vmtest-tool/config.py b/contrib/bpf-vmtest-tool/config.py new file mode 100644 index 000000000000..6ef3b7e18b4b --- /dev/null +++ b/contrib/bpf-vmtest-tool/config.py @@ -0,0 +1,50 @@ +import platform +from pathlib import Path +import os +from dataclasses import dataclass + + +@dataclass +class VMTestConfig: + """Configuration for BPF vmtest tool""" + + vmtest_dir: Path + kernel_tarball_url: str = "https://cdn.kernel.org/pub/linux/kernel/" + arch: str = platform.machine() + vmtest_cc: str = os.getenv("VMTEST_CC", "gcc") + vmtest_cflags: str = os.getenv("VMTEST_CFLAGS", "-g -Wall -Werror ") + vmtest_ldflags: str = os.getenv("VMTEST_LDFLAGS", "-lelf -lz") + bpf_cc: str = os.getenv("BPF_CC", "bpf-unknown-none-gcc") + bpf_cflags: str = os.getenv("BPF_CFLAGS", "-O2 -Wall -Werror -c") + bpf_includes: str = os.getenv("BPF_INCLUDES", "-I/usr/local/include -I/usr/include") + + @property + def kconfig_rel_paths(self) -> list: + """Kernel config paths relative to kernel directory""" + return [ + "tools/testing/selftests/bpf/config", + "tools/testing/selftests/bpf/config.vm", + f"tools/testing/selftests/bpf/config.{self.arch}", + ] + + @property + def kernels_dir(self) -> Path: + """Get kernels directory""" + return self.vmtest_dir / "kernels" + + def __post_init__(self): + """Validate vmtest_dir exists""" + if not self.vmtest_dir.exists(): + raise ValueError(f"VMTEST_DIR does not exist: {self.vmtest_dir}") + if not self.vmtest_dir.is_dir(): + raise ValueError(f"VMTEST_DIR is not a directory: {self.vmtest_dir}") + + +# Global config instance +config = None + + +def init_config(vmtest_dir: str): + """Initialize global config""" + global config + config = VMTestConfig(vmtest_dir=Path(vmtest_dir)) diff --git a/contrib/bpf-vmtest-tool/kernel.py b/contrib/bpf-vmtest-tool/kernel.py new file mode 100644 index 000000000000..de7d2eb0e066 --- /dev/null +++ b/contrib/bpf-vmtest-tool/kernel.py @@ -0,0 +1,301 @@ +import logging +import os +import shutil +import subprocess +from pathlib import Path +import re +import sys +from urllib.parse import urljoin +from urllib.request import urlretrieve +from typing import Optional, List +from dataclasses import dataclass + +import config +import utils + +logger = logging.getLogger(__name__) + + +@dataclass +class KernelSpec: + """Immutable kernel specification""" + + version: str + arch: str | None = None + + def __post_init__(self): + if self.arch is None or self.arch == "": + self.arch = config.config.arch + self.major = self.version.split(".")[0] + + def __str__(self): + return f"{self.version}-{self.arch}" + + @property + def kernel_build_dir(self) -> Path: + return config.config.kernels_dir / f"linux-{self}-build" + + @property + def kernel_dir(self) -> Path: + return config.config.kernels_dir / f"linux-{self}" + + @property + def bzimage_path(self) -> Path: + return self.kernel_dir / f"bzImage-{self}" + + @property + def bpftool_path(self) -> Path: + return self.kernel_dir / "bpftool" + + @property + def libbpf_path(self) -> Path: + return self.kernel_dir / "libbpf.a" + + @property + def vmlinux_path(self) -> Path: + return self.kernel_dir / "vmlinux.h" + + @property + def tarball_path(self) -> Path: + return config.config.kernels_dir / f"linux-{self}.tar.xz" + + +class KernelImage: + """Represents a compiled kernel image""" + + def __init__(self, path: Path): + if not isinstance(path, Path): + path = Path(path) + + if not path.exists(): + raise FileNotFoundError(f"Kernel image not found: {path}") + + self.path = path + + def __str__(self): + return str(self.path) + + +class KernelCompiler: + """Handles complete kernel compilation process including download and build""" + + @staticmethod + def _progress_hook(block_num: int, block_size: int, total_size: int) -> None: + """Progress hook for urlretrieve to display download progress""" + if total_size <= 0: + return + + downloaded = block_num * block_size + percent = min(downloaded * 100 // total_size, 100) + bar_length = 10 + filled = int(bar_length * downloaded // total_size) + bar = "#" * filled + "-" * (bar_length - filled) + + if logger.getEffectiveLevel() <= logging.INFO: + downloaded_mb = downloaded / (1024 * 1024) + total_mb = total_size / (1024 * 1024) + + if sys.stdout.isatty(): + sys.stdout.write( + f"\rDownloading: |{bar}| {percent}% ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)" + ) + sys.stdout.flush() + if downloaded >= total_size: + sys.stdout.write("\n") + sys.stdout.flush() + else: + if downloaded >= total_size or downloaded % (block_size * 100) == 0: + logger.info( + f"Downloading: {percent}% ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)" + ) + + def compile_from_version(self, spec: KernelSpec) -> KernelImage: + """Complete compilation process from kernel version""" + if spec.bzimage_path.exists(): + logger.info(f"Kernel {spec} already exists, skipping compilation") + return KernelImage(spec.bzimage_path) + + try: + self._download_source(spec) + self._extract_source(spec) + self._configure_kernel(spec) + self._compile_kernel(spec) + self._copy_bzimage(spec) + + logger.info(f"Successfully compiled kernel {spec}") + return KernelImage(spec.bzimage_path) + + except Exception as e: + logger.error(f"Failed to compile kernel {spec}: {e}") + sys.exit(1) + finally: + # Always cleanup temporary files + self._cleanup(spec) + + def _download_source(self, spec: KernelSpec) -> None: + """Download kernel source tarball""" + if spec.tarball_path.exists(): + logger.info(f"Tarball already exists: {spec.tarball_path}") + return + + url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz" + url = urljoin(config.config.kernel_tarball_url, url_suffix) + + logger.info(f"Downloading kernel from {url}") + spec.tarball_path.parent.mkdir(parents=True, exist_ok=True) + urlretrieve(url, spec.tarball_path, reporthook=self._progress_hook) + logger.info("Kernel source downloaded") + + def _extract_source(self, spec: KernelSpec) -> None: + """Extract kernel source tarball""" + logger.info(f"Extracting kernel source to {spec.kernel_build_dir}") + spec.kernel_build_dir.mkdir(parents=True, exist_ok=True) + + utils.run_command( + [ + "tar", + "-xf", + str(spec.tarball_path), + "-C", + str(spec.kernel_build_dir), + "--strip-components=1", + ] + ) + + def _configure_kernel(self, spec: KernelSpec) -> None: + """Configure kernel with provided config files""" + config_path = spec.kernel_build_dir / ".config" + + with open(config_path, "wb") as kconfig: + for config_rel_path in config.config.kconfig_rel_paths: + config_abs_path = spec.kernel_build_dir / config_rel_path + if config_abs_path.exists(): + with open(config_abs_path, "rb") as conf: + kconfig.write(conf.read()) + + logger.info("Updated kernel configuration") + + def _compile_kernel(self, spec: KernelSpec) -> None: + """Compile the kernel""" + logger.info(f"Compiling kernel in {spec.kernel_build_dir}") + old_cwd = os.getcwd() + + try: + os.chdir(spec.kernel_build_dir) + # pahole is required for the DEBUG_INFO_BTF kernel configuration option. + pahole_path = shutil.which("pahole") + if pahole_path is None: + logger.error( + "pahole not found in PATH. BTF generation requires pahole v1.16 or later." + ) + sys.exit(1) + friendly_cores = os.cpu_count() - 2 + utils.run_command(["make", "olddefconfig"], stream_output=True) + logger.info("Starting kernel compilation") + utils.run_command( + ["make", f"-j{friendly_cores}", "bzImage"], stream_output=True + ) + logger.info("Compiling bpftool") + utils.run_command( + ["make", "-C", "tools/bpf", f"-j{friendly_cores}", "bpftool"], + stream_output=True, + ) + logger.info("Compiling libbpf") + utils.run_command( + ["make", "-C", "tools/lib/bpf", f"-j{friendly_cores}"], + stream_output=True, + ) + except subprocess.CalledProcessError as e: + logger.error(f"Kernel compilation failed: {e}") + sys.exit(1) + finally: + os.chdir(old_cwd) + + def _copy_bzimage(self, spec: KernelSpec) -> None: + """Copy compiled bzImage to final location""" + # compile the bpftool as well + src = spec.kernel_build_dir / "arch/x86/boot/bzImage" + dest = spec.bzimage_path + dest.parent.mkdir(parents=True, exist_ok=True) + + shutil.copy2(src, dest) + logger.info(f"Stored bzImage at {dest}") + + shutil.copy2( + spec.kernel_build_dir / "tools/bpf/bpftool/vmlinux.h", spec.vmlinux_path + ) + logger.info(f"Stored vmlinux at {spec.vmlinux_path}") + + shutil.copy2( + spec.kernel_build_dir / "tools/bpf/bpftool/bpftool", spec.bpftool_path + ) + logger.info(f"Stored bpftool at {spec.bpftool_path}") + + shutil.copy2(spec.kernel_build_dir / "tools/lib/bpf/libbpf.a", spec.libbpf_path) + logger.info(f"Stored libbpf at {spec.libbpf_path}") + + def _cleanup(self, spec: KernelSpec) -> None: + """Clean up temporary files""" + if spec.tarball_path.exists(): + spec.tarball_path.unlink() + logger.info("Removed tarball") + + if spec.kernel_build_dir.exists(): + shutil.rmtree(spec.kernel_build_dir) + logger.info("Removed kernel source directory") + + +class KernelManager: + """Main interface for kernel management""" + + def __init__(self): + self.compiler = KernelCompiler() + + def remove_kernel(self, name: str) -> None: + """Remove compiled kernel by version""" + version, _, arch = name.partition("-") + spec = KernelSpec(version=version, arch=arch) + if spec.kernel_dir.exists(): + shutil.rmtree(spec.kernel_dir) + logger.info(f"Removed kernel {spec}") + else: + logger.error( + f"Kernel {spec} does not exist, path {spec.kernel_dir} not found" + ) + raise SystemExit(1) + + def build_kernel(self, version: str, arch=None) -> None: + """Build kernel from version""" + + spec = KernelSpec(version=version, arch=arch) + self.compiler.compile_from_version(spec) + + @staticmethod + def get_kernel_from_version( + version: str, + ): + """Get kernel image from version""" + version, _, arch = version.partition("-") + spec = KernelSpec(version=version, arch=arch) + if spec.bzimage_path.exists(): + return spec, KernelImage(spec.bzimage_path) + else: + raise FileNotFoundError( + f"Kernel {spec} not found. Use 'main.py kernel build' to create it." + ) + + def list_kernels(self) -> List[str]: + """List all available compiled kernels""" + if not config.config.kernels_dir.exists(): + raise FileNotFoundError( + f"Kernels directory not found: {config.config.kernels_dir}" + ) + + kernels = [] + for file in config.config.kernels_dir.glob("linux-*"): + if file.is_dir(): + match = re.match(r"linux-(.*)", file.name) + if match: + kernels.append(match.group(1)) + + return sorted(kernels) diff --git a/contrib/bpf-vmtest-tool/main.py b/contrib/bpf-vmtest-tool/main.py new file mode 100644 index 000000000000..43b6036c6150 --- /dev/null +++ b/contrib/bpf-vmtest-tool/main.py @@ -0,0 +1,285 @@ +import argparse +import logging +from pathlib import Path +import sys +import textwrap +import os + +import bpf +import kernel +import vm +import config + +logger = logging.getLogger(__name__) + + +def cmd_kernel_list(args): + """List all available kernels""" + kmanager = kernel.KernelManager() + kernels = kmanager.list_kernels() + if kernels: + for k in kernels: + print(k) + else: + logger.info("No kernels available") + + +def cmd_kernel_remove(args): + """Remove a kernel""" + kmanager = kernel.KernelManager() + if not args.kernel: + logger.error("kernel version required for remove action") + sys.exit(1) + kmanager.remove_kernel(args.kernel) + logger.info(f"Kernel {args.kernel} removed") + print(f"Kernel {args.kernel} removed") + + +def cmd_kernel_build(args): + """Build a kernel""" + kmanager = kernel.KernelManager() + if not args.kernel: + logger.error("kernel version required for build action") + sys.exit(1) + kmanager.build_kernel(version=args.kernel) + + +def cmd_bpf_compile(args): + """Compile BPF source to bytecode only""" + kmanager = kernel.KernelManager() + + try: + kernel_spec, _ = kmanager.get_kernel_from_version(version=args.kernel) + except Exception as e: + logger.error(f"Failed to get kernel: {e}") + sys.exit(1) + + try: + bpf_program = bpf.BPFProgram(source_path=Path(args.bpf_src)) + output_path = bpf_program.compile_bpf(kernel_spec) + + if args.output: + import shutil + + output_dest = Path(args.output) + shutil.copy2(output_path, output_dest) + logger.info(f"Copied to: {output_dest}") + + except Exception as e: + logger.error(f"Failed to compile BPF source: {e}") + sys.exit(1) + + +def cmd_vmtest(args): + """Handle vmtest subcommand""" + kmanager = kernel.KernelManager() + + try: + kernel_spec, kernel_image = kmanager.get_kernel_from_version( + version=args.kernel + ) + except Exception as e: + logger.error(f"Failed to get kernel: {e}") + sys.exit(1) + + try: + if args.bpf_src: + command = bpf.BPFProgram.from_source(Path(args.bpf_src), kernel_spec) + elif args.bpf_obj: + command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj), kernel_spec) + elif args.command: + command = args.command + except Exception as e: + logger.error(f"Failed to prepare command for vmtest: {e}") + sys.exit(1) + + virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, str(command)) + try: + result = virtual_machine.execute() + except vm.BootFailedError as e: + logger.error(f"VM boot failure: {e}") + sys.exit(e.returncode) + + if args.bpf_src or args.bpf_obj: + if result.returncode == 0: + print("BPF programs successfully loaded") + else: + if "Failed to load BPF skeleton" in result.stdout: + print("BPF program failed to load") + print("Verifier logs:") + print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), "\t")) + elif args.command: + print(result.stdout) + + sys.exit(result.returncode) + + +def main(): + parser = argparse.ArgumentParser( + description="BPF vmtest tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + Examples: + # Compile BPF source to bytecode + %(prog)s bpf compile my_prog.bpf.c -o my_prog.bpf.o + + # Run BPF program in VM + %(prog)s vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c + + # List available kernels + %(prog)s kernel list + """), + ) + + parser.add_argument( + "-v", + "--log-level", + help="Log level", + metavar="DEBUG|INFO|WARNING|ERROR", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + default="INFO", + ) + + parser.add_argument( + "--vmtest-dir", + help="Directory for vmtest artifacts (or set VMTEST_DIR env variable)", + metavar="DIR", + type=str, + default=os.getenv("VMTEST_DIR"), + ) + + subparsers = parser.add_subparsers(dest="subcommand", help="Available commands") + + # BPF subcommand + bpf_subparser = subparsers.add_parser("bpf", help="BPF program management") + bpf_subparsers = bpf_subparser.add_subparsers(dest="bpf_action", help="BPF actions") + + # bpf compile subcommand + compile_parser = bpf_subparsers.add_parser( + "compile", help="Compile BPF source to bytecode (.bpf.o)" + ) + compile_parser.add_argument( + "bpf_src", + help="Path to BPF C source file", + type=str, + ) + compile_parser.add_argument( + "-o", + "--output", + help="Output path for compiled bytecode (optional, defaults to temp dir)", + metavar="PATH", + type=str, + required=True, + ) + compile_parser.add_argument( + "-k", + "--kernel", + help="Kernel version to use for compilation", + metavar="VERSION", + type=str, + required=True, + ) + + compile_parser.set_defaults(func=cmd_bpf_compile) + + # VMtest subcommand + vmtest_parser = subparsers.add_parser("vmtest", help="Run VM tests") + vmtest_parser.set_defaults(func=cmd_vmtest) + + vmtest_parser.add_argument( + "-k", + "--kernel", + help="Kernel version to boot in the vm", + metavar="VERSION", + type=str, + required=True, + ) + vmtest_parser.add_argument( + "-r", "--rootfs", help="rootfs to mount in the vm", default="/", metavar="PATH" + ) + command_group = vmtest_parser.add_mutually_exclusive_group(required=True) + command_group.add_argument( + "--bpf-src", + help="Path to BPF C source file", + metavar="PATH", + type=str, + ) + command_group.add_argument( + "--bpf-obj", + help="Path to bpf bytecode object", + metavar="PATH", + type=str, + ) + command_group.add_argument( + "-c", "--command", help="command to run in the vm", metavar="COMMAND" + ) + + # Kernel subcommand with nested subcommands + kernel_subparser = subparsers.add_parser("kernel", help="Kernel management") + kernel_subparsers = kernel_subparser.add_subparsers( + dest="kernel_action", help="Kernel actions" + ) + + # kernel list subcommand + list_parser = kernel_subparsers.add_parser( + "list", help="List all available kernels" + ) + list_parser.set_defaults(func=cmd_kernel_list) + + # kernel remove subcommand + remove_parser = kernel_subparsers.add_parser("remove", help="Remove a kernel") + remove_parser.add_argument( + "kernel", help="Kernel version to remove (e.g., 6.15-x86_64)" + ) + remove_parser.set_defaults(func=cmd_kernel_remove) + + # kernel build subcommand + build_parser = kernel_subparsers.add_parser("build", help="Build a kernel") + build_parser.add_argument( + "kernel", help="Kernel version to build (e.g. 6.15-x86_64)" + ) + build_parser.set_defaults(func=cmd_kernel_build) + + args = parser.parse_args() + logging.basicConfig(level=args.log_level, format="%(levelname)s: %(message)s") + + if not args.vmtest_dir: + logger.error( + "VMTEST_DIR not specified. Use --vmtest-dir=DIR or set VMTEST_DIR environment variable" + ) + sys.exit(1) + + vmtest_path = Path(args.vmtest_dir) + if not vmtest_path.exists(): + logger.error(f"VMTEST_DIR does not exist: {vmtest_path}") + sys.exit(1) + + if not vmtest_path.is_dir(): + logger.error(f"VMTEST_DIR is not a directory: {vmtest_path}") + sys.exit(1) + + try: + config.init_config(vmtest_dir=args.vmtest_dir) + except ValueError as e: + logger.error(str(e)) + sys.exit(1) + + logger.debug(f"VMTEST_DIR set to: {args.vmtest_dir}") + + if hasattr(args, "func"): + args.func(args) + sys.exit(0) + else: + parser.print_help() + sys.exit(1) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + logger.error("Operation cancelled by user") + sys.exit(1) + except Exception as e: + logger.error(f"Unknown error: {e}") + sys.exit(1) diff --git a/contrib/bpf-vmtest-tool/pyproject.toml b/contrib/bpf-vmtest-tool/pyproject.toml new file mode 100644 index 000000000000..cd228cf0e5a9 --- /dev/null +++ b/contrib/bpf-vmtest-tool/pyproject.toml @@ -0,0 +1,36 @@ +[project] +name = "bpf-vmtest-tool" +version = "0.1.0" +description = "Test BPF code against live kernels" +readme = "README.md" +requires-python = ">=3.9" +dependencies = [] + +[dependency-groups] +dev = [ + "pre-commit>=4.2.0", + "pytest>=8.4.0", + "pytest-sugar>=1.0.0", + "ruff>=0.11.13", + "tox>=4.26.0", +] + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] +testpaths = ["tests"] +pythonpath = ["."] + +[tool.ruff.lint] +select = [ + # pycodestyle + "E", + # Pyflakes + "F", +] +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py b/contrib/bpf-vmtest-tool/tests/test_cli.py new file mode 100644 index 000000000000..d9a01328bf26 --- /dev/null +++ b/contrib/bpf-vmtest-tool/tests/test_cli.py @@ -0,0 +1,219 @@ +import sys +from unittest import mock +import pytest +from bpf import BPFProgram +import kernel +import main +import logging +import config +import os + +logger = logging.getLogger(__name__) + + [email protected] +def tmp_config(): + VMTEST_DIR = "/home/d3bug/.bpf-vmtest-tool" + assert VMTEST_DIR is not None, "Specify VMTEST_DIR environment varible" + config.init_config(vmtest_dir=VMTEST_DIR) + + +# reset config for every test [email protected](autouse=True) +def reset_config(): + config.config = None + + [email protected] +def openat_bpf_source(tmp_path): + openat_bpf = tmp_path / "openat_bpf.c" + openat_bpf.write_text(r""" + #include "vmlinux.h" + #include <bpf/bpf_helpers.h> + #include <bpf/bpf_tracing.h> + #include <bpf/bpf_core_read.h> + + char LICENSE[] SEC("license") = "GPL"; + + int example_pid = 0; + + SEC("tracepoint/syscalls/sys_enter_openat") + int handle_openat(struct trace_event_raw_sys_enter *ctx) + { + int pid = bpf_get_current_pid_tgid() >> 32; + char filename[256]; // filename buffer + bpf_probe_read_user(&filename, sizeof(filename), (void *)ctx->args[1]); + bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", pid, + filename); + + return 0; + } + + """) + return openat_bpf + + [email protected] +def openat_bpf_obj(openat_bpf_source, tmp_config): + def _create_openat_bpf_obj(kernel_spec): + bpf_program = BPFProgram(source_path=openat_bpf_source) + bpf_program._compile_bpf(kernel_spec) + return bpf_program.bpf_obj + + return _create_openat_bpf_obj + + [email protected] +def invalid_memory_access_bpf_source(tmp_path): + invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c" + invalid_memory_access_bpf.write_text(r""" + #include "vmlinux.h" + #include <bpf/bpf_helpers.h> + #include <bpf/bpf_tracing.h> + + char LICENSE[] SEC("license") = "GPL"; + + SEC("tracepoint/syscalls/sys_enter_openat") + int bpf_prog(struct trace_event_raw_sys_enter *ctx) { + int arr[4] = {1, 2, 3, 4}; + + // Invalid memory access: out-of-bounds + int val = arr[5]; // This causes the verifier to fail + + return val; + } + """) + return invalid_memory_access_bpf + + [email protected] +def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source, tmp_config): + def _create_invalid_memory_access_bpf_obj(kernel_spec): + bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source) + bpf_program._compile_bpf(kernel_spec) + return bpf_program.bpf_obj + + return _create_invalid_memory_access_bpf_obj + + +def run_main_with_args_and_capture_output(args, capsys): + with mock.patch.object(sys, "argv", args): + try: + main.main() + except SystemExit as e: + result = capsys.readouterr() + output = result.out.rstrip() + error = result.err.rstrip() + logger.debug("STDOUT:\n%s", output) + logger.debug("STDERR:\n%s", error) + return (e.code, output, error) + except Exception as e: + pytest.fail(f"Unknown error happend: {e}") + else: + pytest.fail("Expected main to raise SystemExit") + + +KERNEL_VERSION = "6.16" +kernel_cli_flags = [["--kernel", KERNEL_VERSION]] + + [email protected]("kernel_args", kernel_cli_flags) +class TestCLI: + def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, capsys): + args = [ + "main.py", + "vmtest", + *kernel_args, + "--rootfs", + "/", + "--bpf-src", + str(openat_bpf_source), + ] + code, output, _ = run_main_with_args_and_capture_output(args, capsys) + assert code == 0 + assert "BPF programs successfully loaded" == output + + def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, capsys): + args = [ + "main.py", + "vmtest", + *kernel_args, + "--rootfs", + "/", + "--bpf-obj", + str(openat_bpf_obj(kernel.KernelSpec(kernel_args[1]))), + ] + code, output, _ = run_main_with_args_and_capture_output(args, capsys) + assert code == 0 + assert "BPF programs successfully loaded" == output + + def test_main_with_invalid_bpf( + self, kernel_args, invalid_memory_access_bpf_source, capsys + ): + args = [ + "main.py", + "vmtest", + *kernel_args, + "--rootfs", + "/", + "--bpf-src", + str(invalid_memory_access_bpf_source), + ] + code, output, _ = run_main_with_args_and_capture_output(args, capsys) + output_lines = output.splitlines() + assert code == 1 + assert "BPF program failed to load" == output_lines[0] + assert "Verifier logs:" == output_lines[1] + + def test_main_with_invalid_bpf_obj( + self, kernel_args, invalid_memory_access_bpf_obj, capsys + ): + args = [ + "main.py", + "vmtest", + *kernel_args, + "--rootfs", + "/", + "--bpf-obj", + str(invalid_memory_access_bpf_obj(kernel.KernelSpec(kernel_args[1]))), + ] + code, output, _ = run_main_with_args_and_capture_output(args, capsys) + output_lines = output.splitlines() + assert code == 1 + assert "BPF program failed to load" == output_lines[0] + assert "Verifier logs:" == output_lines[1] + + def test_main_with_valid_command(self, kernel_args, capsys): + args = ["main.py", "vmtest", *kernel_args, "--rootfs", "/", "-c", "uname -r"] + code, output, _ = run_main_with_args_and_capture_output(args, capsys) + assert code == 0 + assert f"{kernel_args[1]}.0" in output + + def test_main_with_invalid_command(self, kernel_args, capsys): + args = [ + "main.py", + "vmtest", + *kernel_args, + "--rootfs", + "/", + "-c", + "NotImplemented", + ] + code, output, error = run_main_with_args_and_capture_output(args, capsys) + assert code != 0 + assert f"Command failed with exit code: {code}" in output + + def test_bpf_compile_subcommand( + self, kernel_args, openat_bpf_source, tmp_path, capsys + ): + args = [ + "main.py", + "bpf", + "compile", + *kernel_args, + "-o", + "", + str(openat_bpf_source), + ] + code, _, _ = run_main_with_args_and_capture_output(args, capsys) + assert code == 0 diff --git a/contrib/bpf-vmtest-tool/utils.py b/contrib/bpf-vmtest-tool/utils.py new file mode 100644 index 000000000000..2919b2c22198 --- /dev/null +++ b/contrib/bpf-vmtest-tool/utils.py @@ -0,0 +1,31 @@ +import subprocess +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +def run_command(cmd: list[str], stream_output: bool = False, **kwargs: Any): + cleaned_cmd = [str(item) for item in cmd if str(item).strip()] + capture_cmd_output = not stream_output + try: + logger.debug(f"running command: {cleaned_cmd}") + result = subprocess.run( + cleaned_cmd, + text=True, + check=True, + capture_output=capture_cmd_output, + shell=False, + **kwargs, + ) + if capture_cmd_output: + logger.debug("Command stdout:\n" + result.stdout.strip()) + if result.stderr: + logger.debug("Command stderr:\n" + result.stderr.strip()) + return result + except subprocess.CalledProcessError as e: + logger.error(e) + if capture_cmd_output: + logger.error("Command failed with stdout: %s", e.stdout.strip()) + logger.error("Command failed with stderr: %s", e.stderr.strip()) + raise diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py new file mode 100644 index 000000000000..56d2052aba8a --- /dev/null +++ b/contrib/bpf-vmtest-tool/vm.py @@ -0,0 +1,175 @@ +import logging +from pathlib import Path +import subprocess +from typing import List + +from kernel import KernelImage + +logger = logging.getLogger(__name__) + + +class VMConfig: + """Configuration container for VM settings""" + + def __init__( + self, kernel_image: KernelImage, rootfs_path: str, command: str, **kwargs + ): + self.kernel = kernel_image + self.kernel_path = str(kernel_image.path) + self.rootfs_path = rootfs_path + self.command = command + self.memory_mb = kwargs.get("memory_mb", 512) + self.cpu_count = kwargs.get("cpu_count", 1) + self.extra_args = kwargs.get("extra_args", {}) + + +def bpf_verifier_logs(output: str) -> str: + start_tag = "--- Verifier log start ---" + end_tag = "--- Verifier log end ---" + + start_idx = output.find(start_tag) + end_idx = output.find(end_tag) + + if start_idx != -1 and end_idx != -1: + # Extract between the tags (excluding the markers themselves) + log_body = output[start_idx + len(start_tag) : end_idx].strip() + return log_body + else: + return "No verifier log found in the output." + + +class Vmtest: + """vmtest backend implementation""" + + def __init__(self): + pass + + def _boot_command(self, vm_config: VMConfig): + vmtest_command = ["vmtest"] + vmtest_command.extend(["-k", vm_config.kernel_path]) + vmtest_command.extend(["-r", vm_config.rootfs_path]) + # If it is a compiled BPF program, use the mounted path inside the VM + if vm_config.command.endswith(".bin"): + vmtest_command.append("/mnt/vmtest/" + Path(vm_config.command).name) + else: + vmtest_command.append(vm_config.command) + return vmtest_command + + def _remove_boot_log(self, full_output: str) -> str: + """ + Filters QEMU and kernel boot logs, returning only the output after the + `===> Running command` marker. + """ + marker = "===> Running command" + lines = full_output.splitlines() + + try: + start_index = next(i for i, line in enumerate(lines) if marker in line) + # Return everything after that marker (excluding the marker itself) + return "\n".join(lines[start_index + 1 :]).strip() + except StopIteration: + return full_output.strip() + + def run_command(self, vm_config): + vm = None + try: + logger.info(f"Booting VM with kernel: {vm_config.kernel_path}") + logger.info(f"Using rootfs: {vm_config.rootfs_path}") + vm = subprocess.run( + self._boot_command(vm_config), + check=True, + text=True, + capture_output=True, + shell=False, + cwd=Path(vm_config.command).parent, + ) + vm_stdout = vm.stdout + logger.debug(vm_stdout) + return VMCommandResult( + vm.returncode, self._remove_boot_log(vm_stdout), None + ) + except FileNotFoundError: + raise BootFailedError( + "vmtest command not found in PATH. Please ensure vmtest is installed and available in your system PATH." + ) + except subprocess.CalledProcessError as e: + out = e.stdout + err = e.stderr + # when the command in the vm fails we consider it as a successful boot + if "===> Running command" not in out: + raise BootFailedError("Boot failed", out, err, e.returncode) + logger.debug("STDOUT: \n%s", out) + logger.debug("STDERR: \n%s", err) + return VMCommandResult(e.returncode, self._remove_boot_log(out), err) + + +class VMCommandResult: + def __init__(self, returncode, stdout, stderr) -> None: + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + +class VirtualMachine: + """Main VM class - simple interface for end users""" + + # Registry of available hypervisors + _hypervisors = { + "vmtest": Vmtest, + } + + def __init__( + self, + kernel_image: KernelImage, + rootfs_path: str, + command: str, + hypervisor_type: str = "vmtest", + **kwargs, + ): + self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs) + + if hypervisor_type not in self._hypervisors: + raise ValueError(f"Unsupported hypervisor: {hypervisor_type}") + + self.hypervisor = self._hypervisors[hypervisor_type]() + + @classmethod + def list_hypervisors(cls) -> List[str]: + """List available hypervisors""" + return list(cls._hypervisors.keys()) + + def execute(self): + """Execute command in VM""" + return self.hypervisor.run_command(self.config) + + +class BootFailedError(Exception): + """Raised when VM fails to boot properly (before command execution).""" + + def __init__( + self, message: str, stdout: str = "", stderr: str = "", returncode: int = -1 + ): + super().__init__(message) + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + + def __str__(self): + base = super().__str__() + + output_parts = [ + base, + f"Return code: {self.returncode}", + ] + + optional_sections = [ + ("STDOUT", self.stdout), + ("STDERR", self.stderr), + ] + + for header, content in optional_sections: + if content: + output_parts.append(f"--- {header} ---") + output_parts.append(content) + + return "\n".join(output_parts)
