Hi Piyush.

> This patch adds the bpf-vmtest-tool subdirectory under contrib which tests
> BPF programs under a live kernel using a QEMU VM.  It can build the
> specified kernel version with eBPF support enabled
> and stores it under $VMTEST_DIR
>
> It can also compile BPF C source files or BPF bytecode objects and
> test them against the kernel verifier for errors.  When a BPF program
> is rejected by the kernel verifier, the verifier logs are displayed.


The BPF bytecode objects don't get compiled I suppose.


>
> $ python3 main.py -k 6.15 --bpf-src assets/ebpf-programs/fail.c
> BPF program failed to load
> Verifier logs:
>         btf_vmlinux is malformed
>         0: R1=ctx() R10=fp0
>         0: (81) r0 = *(s32 *)(r10 +4)
>         invalid read from stack R10 off=4 size=4
>         processed 1 insns (limit 1000000) max_states_per_insn 0 total_states 
> 0 peak_states 0 mark_read 0
>
> See the README for more examples.
>
> The script uses vmtest (https://github.com/danobi/vmtest) to boot
> the VM and run the program.  By default, it uses the host's root
> ("/") as the VM rootfs via the 9p filesystem, so only the kernel is
> replaced during testing.
>
> Tested with Python 3.9 and above.
>
> contrib/ChangeLog:
>
>       * bpf-vmtest-tool/README: New file.
>       * bpf-vmtest-tool/bpf.py: New file.
>       * bpf-vmtest-tool/config.py: New file.
>       * bpf-vmtest-tool/kernel.py: New file.
>       * bpf-vmtest-tool/main.py: New file.
>       * bpf-vmtest-tool/pyproject.toml: New file.
>       * bpf-vmtest-tool/tests/test_cli.py: New file.
>       * bpf-vmtest-tool/utils.py: New file.
>       * bpf-vmtest-tool/vm.py: New file.
>
> Signed-off-by: Piyush Raj <[email protected]>
> ---
>  contrib/bpf-vmtest-tool/README            | 157 ++++++++++++
>  contrib/bpf-vmtest-tool/bpf.py            | 221 +++++++++++++++++
>  contrib/bpf-vmtest-tool/config.py         |  50 ++++
>  contrib/bpf-vmtest-tool/kernel.py         | 290 ++++++++++++++++++++++
>  contrib/bpf-vmtest-tool/main.py           | 285 +++++++++++++++++++++
>  contrib/bpf-vmtest-tool/pyproject.toml    |  36 +++
>  contrib/bpf-vmtest-tool/tests/test_cli.py | 219 ++++++++++++++++
>  contrib/bpf-vmtest-tool/utils.py          |  31 +++
>  contrib/bpf-vmtest-tool/vm.py             | 169 +++++++++++++
>  9 files changed, 1458 insertions(+)
>  create mode 100644 contrib/bpf-vmtest-tool/README
>  create mode 100644 contrib/bpf-vmtest-tool/bpf.py
>  create mode 100644 contrib/bpf-vmtest-tool/config.py
>  create mode 100644 contrib/bpf-vmtest-tool/kernel.py
>  create mode 100644 contrib/bpf-vmtest-tool/main.py
>  create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml
>  create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py
>  create mode 100644 contrib/bpf-vmtest-tool/utils.py
>  create mode 100644 contrib/bpf-vmtest-tool/vm.py
>
> diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README
> new file mode 100644
> index 00000000000..552b2a3e1c8
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/README
> @@ -0,0 +1,157 @@
> +BPF vmtest Tool
> +===============
> +https://gcc.gnu.org/wiki/BPFRunTimeTests
> +
> +This directory contains a Python script to run BPF programs or shell commands
> +under a live Linux kernel using QEMU virtual machines.
> +
> +USAGE
> +=====
> +
> +Initial Setup
> +-------------
> +
> +Before using the tool, you must set the directory where vmtest will look for 
> +kernels and store kernel artifacts. You can do this in two ways:
> +
> +1. Set the VMTEST_DIR environment variable
> +2. Use the --vmtest-dir flag with each command
> +
> +Note: This is required to use the tool.
> +
> +Available Options
> +-----------------
> +
> +View all supported commands using the --help option:
> +
> +    usage: main.py [-h] [-v DEBUG|INFO|WARNING|ERROR] [--vmtest-dir DIR] 
> {bpf,vmtest,kernel} ...
> +
> +    BPF vmtest tool
> +
> +    positional arguments:
> +      {bpf,vmtest,kernel}   Available commands
> +        bpf                 BPF program management
> +        vmtest              Run VM tests
> +        kernel              Kernel management
> +
> +    options:
> +      -h, --help            show this help message and exit
> +      -v DEBUG|INFO|WARNING|ERROR, --log-level DEBUG|INFO|WARNING|ERROR
> +                            Log level
> +      --vmtest-dir DIR      Directory for vmtest artifacts (or set 
> VMTEST_DIR env variable)
> +
> +    Examples:
> +      # Compile BPF source to bytecode
> +      main.py bpf compile my_prog.bpf.c -o my_prog.bpf.o
> +
> +      # Run BPF program in VM
> +      main.py vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c
> +
> +      # List available kernels
> +      main.py kernel list
> +
> +
> +COMMANDS
> +========
> +
> +kernel subcommand
> +-----------------
> +
> +You must build a kernel before using it.
> +
> +Build a kernel:
> +
> +    python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel build 
> 6.16
> +
> +The tool will download and build the specified kernel version from 
> +https://www.kernel.org/pub/linux/kernel and store the build artifacts in 
> +$VMTEST_DIR/kernels/linux-6.15-x86_64. Specifically, it stores bpftool, 
> +bzImage-6.15-x86_64, and vmlinux.h, which are used when compiling BPF 
> programs 
> +instead of relying on the host system.
> +
> +List available kernels:
> +
> +    python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel list
> +
> +Remove kernels:
> +
> +    python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel remove 
> 6.15-x86_64
> +
> +Note: If the architecture is omitted, the host architecture is assumed. For 
> +example, "6.15" will be treated as "6.15-x86_64" on an x86_64 system.
> +
> +vmtest subcommand
> +-----------------
> +
> +Run a shell command inside a live kernel VM:
> +
> +    python main.py vmtest -k 6.15 -r / -c "uname -a"


I see that -r/-rootfs is not documented, nor the other options which are
specific to subcommands.  Can we have a little description of each?

> +
> +Run a BPF source file in the VM:
> +
> +    python main.py vmtest -k 6.15 --bpf-src fail.c
> +
> +Run a precompiled BPF object file:
> +
> +    python main.py vmtest -k 6.15 --bpf-obj fail.bpf.o
> +
> +bpf subcommand
> +--------------
> +
> +You can compile BPF source to bytecode using the kernel-specific bpftool and 
> +vmlinux.h stored in $VMTEST_DIR:
> +
> +    python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" bpf compile 
> invalid-memory-access.c -k 6.15 -o /tmp/invalid-memory-access.bpf.o
> +
> +
> +LIMITATIONS
> +===========
> +
> +- Only x86_64 architecture is currently supported
> +- Only "/" (the root filesystem) is currently supported as the VM rootfs 
> when 
> +  running or testing BPF programs using --bpf-src or --bpf-obj
> +
> +
> +DEPENDENCIES
> +============
> +
> +- Python >= 3.9
> +- vmtest >= v0.18.0 (https://github.com/danobi/vmtest)
> +  - QEMU
> +  - qemu-guest-agent
> +
> +For compiling kernels:
> +- pahole
> +- https://docs.kernel.org/process/changes.html#current-minimal-requirements
> +
> +For compiling and loading BPF programs:
> +- libbpf
> +- gcc-bpf-unknown-none 
> (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF)


So libbpf is still a host dependency.  Would it be possible to use the
libbpf of the built kernel instead?  Much like bpftool.

> +
> +
> +BUILD FLAGS
> +===========
> +
> +You can customize compiler settings using environment variables:
> +
> +- BPF_CC:          Compiler for the BPF program (default: 
> bpf-unknown-none-gcc)
> +- BPF_CFLAGS:      Extra flags for BPF program compilation (default: "-O2")
> +- BPF_INCLUDES:    Include paths for BPF (default: "-I/usr/local/include 
> -I/usr/include")
> +- VMTEST_CC:       Compiler for the user-space loader (default: gcc)
> +- VMTEST_CFLAGS:   Flags for compiling the loader (default: "-g -Wall")
> +- VMTEST_LDFLAGS:  Linker flags for the loader (default: "-lelf -lz -lbpf")
> +
> +Example usage:
> +
> +    BPF_CFLAGS="-O3 -g" BPF_CC="/bpf-gcc-build/gcc/xgcc" python main.py 
> vmtest -k 6.15 --bpf-src fail.c
> +
> +
> +DEVELOPMENT
> +===========
> +
> +Development dependencies are specified in pyproject.toml, which can be used 
> +with any suitable Python virtual environment manager.
> +
> +To run the test suite:
> +
> +    python3 -m pytest
> \ No newline at end of file

Please add a newline character here.

> diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py
> new file mode 100644
> index 00000000000..714cf49abb8
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/bpf.py
> @@ -0,0 +1,221 @@
> +import re
> +import subprocess
> +import logging
> +from pathlib import Path
> +import sys
> +import tempfile
> +from typing import Optional
> +import utils
> +import config
> +import os
> +
> +# Based on the compilation process described in:
> +# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +def generate_sanitized_name(path: Path):
> +    """generate sanitized c variable name"""
> +    name = re.sub(r"\W", "_", path.stem)
> +    if name and name[0].isdigit():
> +        name = "_" + name
> +    return name
> +
> +
> +class BPFProgram:
> +    tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest")
> +    tmp_base_dir_path = Path(tmp_base_dir.name)
> +
> +    def __init__(
> +        self,
> +        source_path: Optional[Path] = None,
> +        bpf_bytecode_path: Optional[Path] = None,
> +        use_temp_dir: bool = False,
> +    ):
> +        path = source_path or bpf_bytecode_path
> +        self.name = generate_sanitized_name(path)
> +        self.build_dir = self.__class__.tmp_base_dir_path / "ebpf_programs" 
> / self.name
> +
> +        if source_path:
> +            self.bpf_src = source_path
> +            self.bpf_obj = self.build_dir / f"{self.name}.bpf.o"
> +        else:
> +            self.bpf_obj = bpf_bytecode_path
> +        self.build_dir.mkdir(parents=True, exist_ok=True)
> +        self.bpf_skel = self.build_dir / f"{self.name}.skel.h"
> +        self.loader_src = self.build_dir / f"{self.name}-loader.c"
> +        self.output = self.build_dir / f"{self.name}.o"
> +
> +    @classmethod
> +    def from_source(cls, source_path: Path, kernel_spec):
> +        self = cls(source_path=source_path)
> +        self._compile_bpf(kernel_spec)
> +        self._compile_from_bpf_bytecode(kernel_spec)
> +        return self.output
> +
> +    @classmethod
> +    def from_bpf_obj(cls, obj_path: Path, kernel_spec):
> +        self = cls(bpf_bytecode_path=obj_path)
> +        self._compile_from_bpf_bytecode(kernel_spec)
> +        return self.output
> +
> +    def compile_bpf(self, kernel_spec) -> Path:
> +        if self.bpf_src is None:
> +            raise ValueError(
> +                "Cannot compile BPF source: instance was created with "
> +                "bpf_bytecode_path instead of source_path"
> +            )
> +        self._compile_bpf(kernel_spec)
> +        return self.bpf_obj
> +
> +    def _compile_from_bpf_bytecode(self, kernel_spec):
> +        self._generate_skeleton(kernel_spec)
> +        self._compile_loader()
> +
> +    def _compile_bpf(self, kernel_spec):
> +        """Compile the eBPF program using gcc"""
> +        logger.info(f"Compiling eBPF source: {self.bpf_src}")
> +        cmd = [
> +            config.config.bpf_cc,
> +            f"-D__TARGET_ARCH_{config.config.arch}",
> +            "-gbtf",
> +            "-std=gnu17",
> +        ]
> +        cmd.append(f"-I{kernel_spec.vmlinux_path.parent}")
> +        cmd.extend(config.config.bpf_cflags.split(" "))
> +        cmd.extend(config.config.bpf_includes.split(" "))
> +        cmd.extend(
> +            [
> +                "-c",
> +                str(self.bpf_src),
> +                "-o",
> +                str(self.bpf_obj),
> +            ]
> +        )
> +        # remove variables that conflict with host compiler
> +        clean_env = os.environ.copy()
> +        clean_env.pop("GCC_EXEC_PREFIX", None)
> +        logger.debug("".join(cmd))
> +        try:
> +            utils.run_command(cmd, env=clean_env)
> +        except subprocess.CalledProcessError as e:
> +            logger.error(f"bpf compilation failed: {e}")
> +            sys.exit(1)
> +        logger.info(f"eBPF compiled: {self.bpf_obj}")
> +
> +    def _generate_skeleton(self, kernel_spec):
> +        """Generate the BPF skeleton header using bpftool"""
> +        logger.info(f"Generating skeleton: {self.bpf_skel}")
> +        cmd = [
> +            kernel_spec.bpftool_path,
> +            "gen",
> +            "skeleton",
> +            str(self.bpf_obj),
> +            "name",
> +            self.name,
> +        ]
> +        try:
> +            result = utils.run_command(cmd)
> +            with open(self.bpf_skel, "w") as f:
> +                f.write(result.stdout)
> +            logger.info("Skeleton generated.")
> +        except subprocess.CalledProcessError:
> +            logger.error("Failed to generate skeleton.")
> +            sys.exit(1)
> +
> +    def _compile_loader(self):
> +        """Compile the C loader program"""
> +        self.generate_loader()
> +        logger.info(f"Compiling loader: {self.loader_src}")
> +        cmd = [
> +            config.config.vmtest_cc,
> +            *config.config.vmtest_cflags.split(" "),
> +            "-I",
> +            str(self.build_dir),
> +            str(self.loader_src),
> +            *config.config.vmtest_ldflags.split(" "),
> +            "-o",
> +            str(self.output),
> +        ]
> +        # remove variables that conflict with host compiler
> +        clean_env = os.environ.copy()
> +        clean_env.pop("GCC_EXEC_PREFIX", None)
> +        try:
> +            utils.run_command(cmd, env=clean_env)
> +        except subprocess.CalledProcessError as e:
> +            logger.error(f"bpf loader compilation failed: {e}")
> +            sys.exit(1)
> +
> +        logger.info("Compilation complete")
> +
> +    def generate_loader(self):
> +        """
> +        Generate a loader C file for the given BPF skeleton.
> +
> +        Args:
> +            bpf_name (str): Name of the BPF program (e.g. "prog").
> +            output_path (str): Path to write loader.c.
> +        """
> +        skeleton_header = f"{self.name}.skel.h"
> +        loader_code = f"""\
> +        #include <stdio.h>
> +        #include <stdlib.h>
> +        #include <signal.h>
> +        #include <unistd.h>
> +        #include <bpf/libbpf.h>
> +        #include "{skeleton_header}"
> +
> +        #define LOG_BUF_SIZE 1024 * 1024
> +
> +        static volatile sig_atomic_t stop;
> +        static char log_buf[LOG_BUF_SIZE];
> +
> +        void handle_sigint(int sig) {{
> +            stop = 1;
> +        }}
> +
> +        int main() {{
> +            struct {self.name} *skel;
> +            struct bpf_program *prog;
> +            int err;
> +
> +            signal(SIGINT, handle_sigint);
> +
> +            skel = {self.name}__open();  // STEP 1: open only
> +            if (!skel) {{
> +                fprintf(stderr, "Failed to open BPF skeleton\\n");
> +                return 1;
> +            }}
> +
> +            // STEP 2: Get the bpf_program object for the main program
> +            bpf_object__for_each_program(prog, skel->obj) {{
> +                bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf));
> +                bpf_program__set_log_level(prog, 1); // optional: verbose 
> logs
> +            }}
> +
> +            // STEP 3: Load the program (this will trigger verifier log 
> output)
> +            err = {self.name}__load(skel);
> +            fprintf(
> +             stderr,
> +             "--- Verifier log start ---\\n"
> +             "%s\\n"
> +             "--- Verifier log end ---\\n",
> +             log_buf
> +             );
> +            if (err) {{
> +                fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err);
> +                {self.name}__destroy(skel);
> +                return 1;
> +            }}
> +
> +            printf("BPF program loaded successfully.\\n");
> +
> +            {self.name}__destroy(skel);
> +            return 0;
> +        }}
> +
> +        """
> +        with open(self.loader_src, "w") as f:
> +            f.write(loader_code)
> +        logger.info(f"Generated loader at {self.loader_src}")
> diff --git a/contrib/bpf-vmtest-tool/config.py 
> b/contrib/bpf-vmtest-tool/config.py
> new file mode 100644
> index 00000000000..5f94f8f69bf
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/config.py
> @@ -0,0 +1,50 @@
> +import platform
> +from pathlib import Path
> +import os
> +from dataclasses import dataclass
> +
> +
> +@dataclass
> +class VMTestConfig:
> +    """Configuration for BPF vmtest tool"""
> +
> +    vmtest_dir: Path
> +    kernel_tarball_url: str = "https://cdn.kernel.org/pub/linux/kernel/";
> +    arch: str = platform.machine()
> +    vmtest_cc: str = os.getenv("VMTEST_CC", "gcc")
> +    vmtest_cflags: str = os.getenv("VMTEST_CFLAGS", "-g -Wall -Werror ")
> +    vmtest_ldflags: str = os.getenv("VMTEST_LDFLAGS", "-lelf -lz -lbpf")
> +    bpf_cc: str = os.getenv("BPF_CC", "bpf-unknown-none-gcc")
> +    bpf_cflags: str = os.getenv("BPF_CFLAGS", "-O2 -Wall -Werror")
> +    bpf_includes: str = os.getenv("BPF_INCLUDES", "-I/usr/local/include 
> -I/usr/include")
> +
> +    @property
> +    def kconfig_rel_paths(self) -> list:
> +        """Kernel config paths relative to kernel directory"""
> +        return [
> +            "tools/testing/selftests/bpf/config",
> +            "tools/testing/selftests/bpf/config.vm",
> +            f"tools/testing/selftests/bpf/config.{self.arch}",
> +        ]
> +
> +    @property
> +    def kernels_dir(self) -> Path:
> +        """Get kernels directory"""
> +        return self.vmtest_dir / "kernels"
> +
> +    def __post_init__(self):
> +        """Validate vmtest_dir exists"""
> +        if not self.vmtest_dir.exists():
> +            raise ValueError(f"VMTEST_DIR does not exist: {self.vmtest_dir}")
> +        if not self.vmtest_dir.is_dir():
> +            raise ValueError(f"VMTEST_DIR is not a directory: 
> {self.vmtest_dir}")
> +
> +
> +# Global config instance
> +config = None
> +
> +
> +def init_config(vmtest_dir: str):
> +    """Initialize global config"""
> +    global config
> +    config = VMTestConfig(vmtest_dir=Path(vmtest_dir))
> diff --git a/contrib/bpf-vmtest-tool/kernel.py 
> b/contrib/bpf-vmtest-tool/kernel.py
> new file mode 100644
> index 00000000000..f60bc859206
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/kernel.py
> @@ -0,0 +1,290 @@
> +import logging
> +import os
> +import shutil
> +import subprocess
> +from pathlib import Path
> +import re
> +import sys
> +from urllib.parse import urljoin
> +from urllib.request import urlretrieve
> +from typing import Optional, List
> +from dataclasses import dataclass
> +
> +import config
> +import utils
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +@dataclass
> +class KernelSpec:
> +    """Immutable kernel specification"""
> +
> +    version: str
> +    arch: str | None = None
> +
> +    def __post_init__(self):
> +        if self.arch is None or self.arch == "":
> +            self.arch = config.config.arch
> +        self.major = self.version.split(".")[0]
> +
> +    def __str__(self):
> +        return f"{self.version}-{self.arch}"
> +
> +    @property
> +    def kernel_build_dir(self) -> Path:
> +        return config.config.kernels_dir / f"linux-{self}-build"
> +
> +    @property
> +    def kernel_dir(self) -> Path:
> +        return config.config.kernels_dir / f"linux-{self}"
> +
> +    @property
> +    def bzimage_path(self) -> Path:
> +        return self.kernel_dir / f"bzImage-{self}"
> +
> +    @property
> +    def bpftool_path(self) -> Path:
> +        return self.kernel_dir / "bpftool"
> +
> +    @property
> +    def vmlinux_path(self) -> Path:
> +        return self.kernel_dir / "vmlinux.h"
> +
> +    @property
> +    def tarball_path(self) -> Path:
> +        return config.config.kernels_dir / f"linux-{self}.tar.xz"
> +
> +
> +class KernelImage:
> +    """Represents a compiled kernel image"""
> +
> +    def __init__(self, path: Path):
> +        if not isinstance(path, Path):
> +            path = Path(path)
> +
> +        if not path.exists():
> +            raise FileNotFoundError(f"Kernel image not found: {path}")
> +
> +        self.path = path
> +
> +    def __str__(self):
> +        return str(self.path)
> +
> +
> +class KernelCompiler:
> +    """Handles complete kernel compilation process including download and 
> build"""
> +
> +    @staticmethod
> +    def _progress_hook(block_num: int, block_size: int, total_size: int) -> 
> None:
> +        """Progress hook for urlretrieve to display download progress"""
> +        if total_size <= 0:
> +            return
> +
> +        downloaded = block_num * block_size
> +        percent = min(downloaded * 100 // total_size, 100)
> +        bar_length = 10
> +        filled = int(bar_length * downloaded // total_size)
> +        bar = "#" * filled + "-" * (bar_length - filled)
> +
> +        if logger.getEffectiveLevel() <= logging.INFO:
> +            downloaded_mb = downloaded / (1024 * 1024)
> +            total_mb = total_size / (1024 * 1024)
> +
> +            if sys.stdout.isatty():
> +                sys.stdout.write(
> +                    f"\rDownloading: |{bar}| {percent}% 
> ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)"
> +                )
> +                sys.stdout.flush()
> +                if downloaded >= total_size:
> +                    sys.stdout.write("\n")
> +                    sys.stdout.flush()
> +            else:
> +                if downloaded >= total_size or downloaded % (block_size * 
> 100) == 0:
> +                    logger.info(
> +                        f"Downloading: {percent}% ({downloaded_mb:.2f}MB / 
> {total_mb:.2f}MB)"
> +                    )
> +
> +    def compile_from_version(self, spec: KernelSpec) -> KernelImage:
> +        """Complete compilation process from kernel version"""
> +        if spec.bzimage_path.exists():
> +            logger.info(f"Kernel {spec} already exists, skipping 
> compilation")
> +            return KernelImage(spec.bzimage_path)
> +
> +        try:
> +            self._download_source(spec)
> +            self._extract_source(spec)
> +            self._configure_kernel(spec)
> +            self._compile_kernel(spec)
> +            self._copy_bzimage(spec)
> +            # generate vmlinux.h from the kernel by starign the vm from the 
> build kernel
> +
> +            logger.info(f"Successfully compiled kernel {spec}")
> +            return KernelImage(spec.bzimage_path)
> +
> +        except Exception as e:
> +            logger.error(f"Failed to compile kernel {spec}: {e}")
> +            sys.exit(1)
> +        finally:
> +            # Always cleanup temporary files
> +            self._cleanup(spec)
> +
> +    def _download_source(self, spec: KernelSpec) -> None:
> +        """Download kernel source tarball"""
> +        if spec.tarball_path.exists():
> +            logger.info(f"Tarball already exists: {spec.tarball_path}")
> +            return
> +
> +        url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz"
> +        url = urljoin(config.config.kernel_tarball_url, url_suffix)
> +
> +        logger.info(f"Downloading kernel from {url}")
> +        spec.tarball_path.parent.mkdir(parents=True, exist_ok=True)
> +        urlretrieve(url, spec.tarball_path, reporthook=self._progress_hook)
> +        logger.info("Kernel source downloaded")
> +
> +    def _extract_source(self, spec: KernelSpec) -> None:
> +        """Extract kernel source tarball"""
> +        logger.info(f"Extracting kernel source to {spec.kernel_build_dir}")
> +        spec.kernel_build_dir.mkdir(parents=True, exist_ok=True)
> +
> +        utils.run_command(
> +            [
> +                "tar",
> +                "-xf",
> +                str(spec.tarball_path),
> +                "-C",
> +                str(spec.kernel_build_dir),
> +                "--strip-components=1",
> +            ]
> +        )
> +
> +    def _configure_kernel(self, spec: KernelSpec) -> None:
> +        """Configure kernel with provided config files"""
> +        config_path = spec.kernel_build_dir / ".config"
> +
> +        with open(config_path, "wb") as kconfig:
> +            for config_rel_path in config.config.kconfig_rel_paths:
> +                config_abs_path = spec.kernel_build_dir / config_rel_path
> +                if config_abs_path.exists():
> +                    with open(config_abs_path, "rb") as conf:
> +                        kconfig.write(conf.read())
> +
> +        logger.info("Updated kernel configuration")
> +
> +    def _compile_kernel(self, spec: KernelSpec) -> None:
> +        """Compile the kernel"""
> +        logger.info(f"Compiling kernel in {spec.kernel_build_dir}")
> +        old_cwd = os.getcwd()
> +
> +        try:
> +            os.chdir(spec.kernel_build_dir)
> +            # pahole is required for the DEBUG_INFO_BTF kernel configuration 
> option.
> +            pahole_path = shutil.which("pahole")
> +            if pahole_path is None:
> +                logger.error(
> +                    "pahole not found in PATH. BTF generation requires 
> pahole v1.16 or later."
> +                )
> +                sys.exit(1)
> +            friendly_cores = os.cpu_count() - 2
> +            utils.run_command(["make", "olddefconfig"], stream_output=True)
> +            logger.info("Starting kernel compilation")
> +            utils.run_command(
> +                ["make", f"-j{friendly_cores}", "bzImage"], 
> stream_output=True
> +            )
> +            logger.info("Compiling bpftool")
> +            utils.run_command(
> +                ["make", "-C", "tools/bpf", f"-j{friendly_cores}", 
> "bpftool"],
> +                stream_output=True,
> +            )
> +        except subprocess.CalledProcessError as e:
> +            logger.error(f"Kernel compilation failed: {e}")
> +            sys.exit(1)
> +        finally:
> +            os.chdir(old_cwd)
> +
> +    def _copy_bzimage(self, spec: KernelSpec) -> None:
> +        """Copy compiled bzImage to final location"""
> +        # compile the bpftool as well
> +        src = spec.kernel_build_dir / "arch/x86/boot/bzImage"
> +        dest = spec.bzimage_path
> +        dest.parent.mkdir(parents=True, exist_ok=True)
> +
> +        shutil.copy2(src, dest)
> +        logger.info(f"Stored bzImage at {dest}")
> +
> +        shutil.copy2(
> +            spec.kernel_build_dir / "tools/bpf/bpftool/vmlinux.h", 
> spec.vmlinux_path
> +        )
> +        logger.info(f"Stored vmlinux at {spec.vmlinux_path}")
> +
> +        shutil.copy2(
> +            spec.kernel_build_dir / "tools/bpf/bpftool/bpftool", 
> spec.bpftool_path
> +        )
> +        logger.info(f"Stored bpftool at {spec.bpftool_path}")
> +
> +    def _cleanup(self, spec: KernelSpec) -> None:
> +        """Clean up temporary files"""
> +        if spec.tarball_path.exists():
> +            spec.tarball_path.unlink()
> +            logger.info("Removed tarball")
> +
> +        if spec.kernel_build_dir.exists():
> +            shutil.rmtree(spec.kernel_build_dir)
> +            logger.info("Removed kernel source directory")
> +
> +
> +class KernelManager:
> +    """Main interface for kernel management"""
> +
> +    def __init__(self):
> +        self.compiler = KernelCompiler()
> +
> +    def remove_kernel(self, name: str) -> None:
> +        """Remove compiled kernel by version"""
> +        version, _, arch = name.partition("-")
> +        spec = KernelSpec(version=version, arch=arch)
> +        if spec.kernel_dir.exists():
> +            shutil.rmtree(spec.kernel_dir)
> +            logger.info(f"Removed kernel {spec}")
> +        else:
> +            logger.error(
> +                f"Kernel {spec} does not exist, path {spec.kernel_dir} not 
> found"
> +            )
> +            raise SystemExit(1)
> +
> +    def build_kernel(self, version: str, arch=None) -> None:
> +        """Build kernel from version"""
> +
> +        spec = KernelSpec(version=version, arch=arch)
> +        self.compiler.compile_from_version(spec)
> +
> +    @staticmethod
> +    def get_kernel_from_version(
> +        version: str,
> +    ):
> +        """Get kernel image from version"""
> +        version, _, arch = version.partition("-")
> +        spec = KernelSpec(version=version, arch=arch)
> +        if spec.bzimage_path.exists():
> +            return spec, KernelImage(spec.bzimage_path)
> +        else:
> +            raise FileNotFoundError(
> +                f"Kernel {spec} not found. Use 'main.py kernel build' to 
> create it."
> +            )
> +
> +    def list_kernels(self) -> List[str]:
> +        """List all available compiled kernels"""
> +        if not config.config.kernels_dir.exists():
> +            raise FileNotFoundError(
> +                f"Kernels directory not found: {config.config.kernels_dir}"
> +            )
> +
> +        kernels = []
> +        for file in config.config.kernels_dir.glob("linux-*"):
> +            if file.is_dir():
> +                match = re.match(r"linux-(.*)", file.name)
> +                if match:
> +                    kernels.append(match.group(1))
> +
> +        return sorted(kernels)
> diff --git a/contrib/bpf-vmtest-tool/main.py b/contrib/bpf-vmtest-tool/main.py
> new file mode 100644
> index 00000000000..43b6036c615
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/main.py
> @@ -0,0 +1,285 @@
> +import argparse
> +import logging
> +from pathlib import Path
> +import sys
> +import textwrap
> +import os
> +
> +import bpf
> +import kernel
> +import vm
> +import config
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +def cmd_kernel_list(args):
> +    """List all available kernels"""
> +    kmanager = kernel.KernelManager()
> +    kernels = kmanager.list_kernels()
> +    if kernels:
> +        for k in kernels:
> +            print(k)
> +    else:
> +        logger.info("No kernels available")
> +
> +
> +def cmd_kernel_remove(args):
> +    """Remove a kernel"""
> +    kmanager = kernel.KernelManager()
> +    if not args.kernel:
> +        logger.error("kernel version required for remove action")
> +        sys.exit(1)
> +    kmanager.remove_kernel(args.kernel)
> +    logger.info(f"Kernel {args.kernel} removed")
> +    print(f"Kernel {args.kernel} removed")
> +
> +
> +def cmd_kernel_build(args):
> +    """Build a kernel"""
> +    kmanager = kernel.KernelManager()
> +    if not args.kernel:
> +        logger.error("kernel version required for build action")
> +        sys.exit(1)
> +    kmanager.build_kernel(version=args.kernel)
> +
> +
> +def cmd_bpf_compile(args):
> +    """Compile BPF source to bytecode only"""
> +    kmanager = kernel.KernelManager()
> +
> +    try:
> +        kernel_spec, _ = 
> kmanager.get_kernel_from_version(version=args.kernel)
> +    except Exception as e:
> +        logger.error(f"Failed to get kernel: {e}")
> +        sys.exit(1)
> +
> +    try:
> +        bpf_program = bpf.BPFProgram(source_path=Path(args.bpf_src))
> +        output_path = bpf_program.compile_bpf(kernel_spec)
> +
> +        if args.output:
> +            import shutil
> +
> +            output_dest = Path(args.output)
> +            shutil.copy2(output_path, output_dest)
> +            logger.info(f"Copied to: {output_dest}")
> +
> +    except Exception as e:
> +        logger.error(f"Failed to compile BPF source: {e}")
> +        sys.exit(1)
> +
> +
> +def cmd_vmtest(args):
> +    """Handle vmtest subcommand"""
> +    kmanager = kernel.KernelManager()
> +
> +    try:
> +        kernel_spec, kernel_image = kmanager.get_kernel_from_version(
> +            version=args.kernel
> +        )
> +    except Exception as e:
> +        logger.error(f"Failed to get kernel: {e}")
> +        sys.exit(1)
> +
> +    try:
> +        if args.bpf_src:
> +            command = bpf.BPFProgram.from_source(Path(args.bpf_src), 
> kernel_spec)
> +        elif args.bpf_obj:
> +            command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj), 
> kernel_spec)
> +        elif args.command:
> +            command = args.command
> +    except Exception as e:
> +        logger.error(f"Failed to prepare command for vmtest: {e}")
> +        sys.exit(1)
> +
> +    virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, 
> str(command))
> +    try:
> +        result = virtual_machine.execute()
> +    except vm.BootFailedError as e:
> +        logger.error(f"VM boot failure: {e}")
> +        sys.exit(e.returncode)
> +
> +    if args.bpf_src or args.bpf_obj:
> +        if result.returncode == 0:
> +            print("BPF programs successfully loaded")
> +        else:
> +            if "Failed to load BPF skeleton" in result.stdout:
> +                print("BPF program failed to load")
> +                print("Verifier logs:")
> +                print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), 
> "\t"))
> +    elif args.command:
> +        print(result.stdout)
> +
> +    sys.exit(result.returncode)
> +
> +
> +def main():
> +    parser = argparse.ArgumentParser(
> +        description="BPF vmtest tool",
> +        formatter_class=argparse.RawDescriptionHelpFormatter,
> +        epilog=textwrap.dedent("""
> +            Examples:
> +              # Compile BPF source to bytecode
> +              %(prog)s bpf compile my_prog.bpf.c -o my_prog.bpf.o
> +              
> +              # Run BPF program in VM
> +              %(prog)s vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c
> +              
> +              # List available kernels
> +              %(prog)s kernel list
> +        """),
> +    )
> +
> +    parser.add_argument(
> +        "-v",
> +        "--log-level",
> +        help="Log level",
> +        metavar="DEBUG|INFO|WARNING|ERROR",
> +        choices=["DEBUG", "INFO", "WARNING", "ERROR"],
> +        default="INFO",
> +    )
> +
> +    parser.add_argument(
> +        "--vmtest-dir",
> +        help="Directory for vmtest artifacts (or set VMTEST_DIR env 
> variable)",
> +        metavar="DIR",
> +        type=str,
> +        default=os.getenv("VMTEST_DIR"),
> +    )
> +
> +    subparsers = parser.add_subparsers(dest="subcommand", help="Available 
> commands")
> +
> +    # BPF subcommand
> +    bpf_subparser = subparsers.add_parser("bpf", help="BPF program 
> management")
> +    bpf_subparsers = bpf_subparser.add_subparsers(dest="bpf_action", 
> help="BPF actions")
> +
> +    # bpf compile subcommand
> +    compile_parser = bpf_subparsers.add_parser(
> +        "compile", help="Compile BPF source to bytecode (.bpf.o)"
> +    )
> +    compile_parser.add_argument(
> +        "bpf_src",
> +        help="Path to BPF C source file",
> +        type=str,
> +    )
> +    compile_parser.add_argument(
> +        "-o",
> +        "--output",
> +        help="Output path for compiled bytecode (optional, defaults to temp 
> dir)",
> +        metavar="PATH",
> +        type=str,
> +        required=True,
> +    )
> +    compile_parser.add_argument(
> +        "-k",
> +        "--kernel",
> +        help="Kernel version to use for compilation",
> +        metavar="VERSION",
> +        type=str,
> +        required=True,
> +    )
> +
> +    compile_parser.set_defaults(func=cmd_bpf_compile)
> +
> +    # VMtest subcommand
> +    vmtest_parser = subparsers.add_parser("vmtest", help="Run VM tests")
> +    vmtest_parser.set_defaults(func=cmd_vmtest)
> +
> +    vmtest_parser.add_argument(
> +        "-k",
> +        "--kernel",
> +        help="Kernel version to boot in the vm",
> +        metavar="VERSION",
> +        type=str,
> +        required=True,
> +    )
> +    vmtest_parser.add_argument(
> +        "-r", "--rootfs", help="rootfs to mount in the vm", default="/", 
> metavar="PATH"
> +    )
> +    command_group = vmtest_parser.add_mutually_exclusive_group(required=True)
> +    command_group.add_argument(
> +        "--bpf-src",
> +        help="Path to BPF C source file",
> +        metavar="PATH",
> +        type=str,
> +    )
> +    command_group.add_argument(
> +        "--bpf-obj",
> +        help="Path to bpf bytecode object",
> +        metavar="PATH",
> +        type=str,
> +    )
> +    command_group.add_argument(
> +        "-c", "--command", help="command to run in the vm", metavar="COMMAND"
> +    )
> +
> +    # Kernel subcommand with nested subcommands
> +    kernel_subparser = subparsers.add_parser("kernel", help="Kernel 
> management")
> +    kernel_subparsers = kernel_subparser.add_subparsers(
> +        dest="kernel_action", help="Kernel actions"
> +    )
> +
> +    # kernel list subcommand
> +    list_parser = kernel_subparsers.add_parser(
> +        "list", help="List all available kernels"
> +    )
> +    list_parser.set_defaults(func=cmd_kernel_list)
> +
> +    # kernel remove subcommand
> +    remove_parser = kernel_subparsers.add_parser("remove", help="Remove a 
> kernel")
> +    remove_parser.add_argument(
> +        "kernel", help="Kernel version to remove (e.g., 6.15-x86_64)"
> +    )
> +    remove_parser.set_defaults(func=cmd_kernel_remove)
> +
> +    # kernel build subcommand
> +    build_parser = kernel_subparsers.add_parser("build", help="Build a 
> kernel")
> +    build_parser.add_argument(
> +        "kernel", help="Kernel version to build (e.g. 6.15-x86_64)"
> +    )
> +    build_parser.set_defaults(func=cmd_kernel_build)
> +
> +    args = parser.parse_args()
> +    logging.basicConfig(level=args.log_level, format="%(levelname)s: 
> %(message)s")
> +
> +    if not args.vmtest_dir:
> +        logger.error(
> +            "VMTEST_DIR not specified. Use --vmtest-dir=DIR or set 
> VMTEST_DIR environment variable"
> +        )
> +        sys.exit(1)
> +
> +    vmtest_path = Path(args.vmtest_dir)
> +    if not vmtest_path.exists():
> +        logger.error(f"VMTEST_DIR does not exist: {vmtest_path}")
> +        sys.exit(1)
> +
> +    if not vmtest_path.is_dir():
> +        logger.error(f"VMTEST_DIR is not a directory: {vmtest_path}")
> +        sys.exit(1)
> +
> +    try:
> +        config.init_config(vmtest_dir=args.vmtest_dir)
> +    except ValueError as e:
> +        logger.error(str(e))
> +        sys.exit(1)
> +
> +    logger.debug(f"VMTEST_DIR set to: {args.vmtest_dir}")
> +
> +    if hasattr(args, "func"):
> +        args.func(args)
> +        sys.exit(0)
> +    else:
> +        parser.print_help()
> +        sys.exit(1)
> +
> +
> +if __name__ == "__main__":
> +    try:
> +        main()
> +    except KeyboardInterrupt:
> +        logger.error("Operation cancelled by user")
> +        sys.exit(1)
> +    except Exception as e:
> +        logger.error(f"Unknown error: {e}")
> +        sys.exit(1)
> diff --git a/contrib/bpf-vmtest-tool/pyproject.toml 
> b/contrib/bpf-vmtest-tool/pyproject.toml
> new file mode 100644
> index 00000000000..1977612cfd6
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/pyproject.toml
> @@ -0,0 +1,36 @@
> +[project]
> +name = "bpf-vmtest-tool"
> +version = "0.1.0"
> +description = "Test BPF code against live kernels"
> +readme = "README.md"
> +requires-python = ">=3.9"
> +dependencies = []
> +
> +[dependency-groups]
> +dev = [
> +    "pre-commit>=4.2.0",
> +    "pytest>=8.4.0",
> +    "pytest-sugar>=1.0.0",
> +    "ruff>=0.11.13",
> +    "tox>=4.26.0",
> +]
> +
> +[tool.pytest.ini_options]
> +addopts = [
> +    "--import-mode=importlib",
> +]
> +testpaths = ["tests"]
> +pythonpath = ["."]
> +
> +[tool.ruff.lint]
> +select = [
> +    # pycodestyle
> +    "E",
> +    # Pyflakes
> +    "F",
> +]
> +# Allow fix for all enabled rules (when `--fix`) is provided.
> +fixable = ["ALL"]
> +
> +# Allow unused variables when underscore-prefixed.
> +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
> \ No newline at end of file
> diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py 
> b/contrib/bpf-vmtest-tool/tests/test_cli.py
> new file mode 100644
> index 00000000000..d9a01328bf2
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/tests/test_cli.py
> @@ -0,0 +1,219 @@
> +import sys
> +from unittest import mock
> +import pytest
> +from bpf import BPFProgram
> +import kernel
> +import main
> +import logging
> +import config
> +import os
> +
> +logger = logging.getLogger(__name__)
> +
> +
> [email protected]
> +def tmp_config():
> +    VMTEST_DIR = "/home/d3bug/.bpf-vmtest-tool"
> +    assert VMTEST_DIR is not None, "Specify VMTEST_DIR environment varible"
> +    config.init_config(vmtest_dir=VMTEST_DIR)
> +
> +
> +# reset config for every test
> [email protected](autouse=True)
> +def reset_config():
> +    config.config = None
> +
> +
> [email protected]
> +def openat_bpf_source(tmp_path):
> +    openat_bpf = tmp_path / "openat_bpf.c"
> +    openat_bpf.write_text(r"""
> +    #include "vmlinux.h"
> +    #include <bpf/bpf_helpers.h>
> +    #include <bpf/bpf_tracing.h>
> +    #include <bpf/bpf_core_read.h>
> +
> +    char LICENSE[] SEC("license") = "GPL";
> +
> +    int example_pid = 0;
> +
> +    SEC("tracepoint/syscalls/sys_enter_openat")
> +    int handle_openat(struct trace_event_raw_sys_enter *ctx)
> +    {
> +        int pid = bpf_get_current_pid_tgid() >> 32;
> +        char filename[256]; // filename buffer
> +        bpf_probe_read_user(&filename, sizeof(filename), (void 
> *)ctx->args[1]);
> +        bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", 
> pid,
> +            filename);
> +
> +        return 0;
> +    }
> +
> +    """)
> +    return openat_bpf
> +
> +
> [email protected]
> +def openat_bpf_obj(openat_bpf_source, tmp_config):
> +    def _create_openat_bpf_obj(kernel_spec):
> +        bpf_program = BPFProgram(source_path=openat_bpf_source)
> +        bpf_program._compile_bpf(kernel_spec)
> +        return bpf_program.bpf_obj
> +
> +    return _create_openat_bpf_obj
> +
> +
> [email protected]
> +def invalid_memory_access_bpf_source(tmp_path):
> +    invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c"
> +    invalid_memory_access_bpf.write_text(r"""
> +    #include "vmlinux.h"
> +    #include <bpf/bpf_helpers.h>
> +    #include <bpf/bpf_tracing.h>
> +
> +    char LICENSE[] SEC("license") = "GPL";
> +
> +    SEC("tracepoint/syscalls/sys_enter_openat")
> +    int bpf_prog(struct trace_event_raw_sys_enter *ctx) {
> +        int arr[4] = {1, 2, 3, 4};
> +
> +        // Invalid memory access: out-of-bounds
> +        int val = arr[5];  // This causes the verifier to fail
> +
> +        return val;
> +    }
> +    """)
> +    return invalid_memory_access_bpf
> +
> +
> [email protected]
> +def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source, 
> tmp_config):
> +    def _create_invalid_memory_access_bpf_obj(kernel_spec):
> +        bpf_program = 
> BPFProgram(source_path=invalid_memory_access_bpf_source)
> +        bpf_program._compile_bpf(kernel_spec)
> +        return bpf_program.bpf_obj
> +
> +    return _create_invalid_memory_access_bpf_obj
> +
> +
> +def run_main_with_args_and_capture_output(args, capsys):
> +    with mock.patch.object(sys, "argv", args):
> +        try:
> +            main.main()
> +        except SystemExit as e:
> +            result = capsys.readouterr()
> +            output = result.out.rstrip()
> +            error = result.err.rstrip()
> +            logger.debug("STDOUT:\n%s", output)
> +            logger.debug("STDERR:\n%s", error)
> +            return (e.code, output, error)
> +        except Exception as e:
> +            pytest.fail(f"Unknown error happend: {e}")
> +        else:
> +            pytest.fail("Expected main to raise SystemExit")
> +
> +
> +KERNEL_VERSION = "6.16"
> +kernel_cli_flags = [["--kernel", KERNEL_VERSION]]
> +
> +
> [email protected]("kernel_args", kernel_cli_flags)
> +class TestCLI:
> +    def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, 
> capsys):
> +        args = [
> +            "main.py",
> +            "vmtest",
> +            *kernel_args,
> +            "--rootfs",
> +            "/",
> +            "--bpf-src",
> +            str(openat_bpf_source),
> +        ]
> +        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
> +        assert code == 0
> +        assert "BPF programs successfully loaded" == output
> +
> +    def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, 
> capsys):
> +        args = [
> +            "main.py",
> +            "vmtest",
> +            *kernel_args,
> +            "--rootfs",
> +            "/",
> +            "--bpf-obj",
> +            str(openat_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
> +        ]
> +        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
> +        assert code == 0
> +        assert "BPF programs successfully loaded" == output
> +
> +    def test_main_with_invalid_bpf(
> +        self, kernel_args, invalid_memory_access_bpf_source, capsys
> +    ):
> +        args = [
> +            "main.py",
> +            "vmtest",
> +            *kernel_args,
> +            "--rootfs",
> +            "/",
> +            "--bpf-src",
> +            str(invalid_memory_access_bpf_source),
> +        ]
> +        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
> +        output_lines = output.splitlines()
> +        assert code == 1
> +        assert "BPF program failed to load" == output_lines[0]
> +        assert "Verifier logs:" == output_lines[1]
> +
> +    def test_main_with_invalid_bpf_obj(
> +        self, kernel_args, invalid_memory_access_bpf_obj, capsys
> +    ):
> +        args = [
> +            "main.py",
> +            "vmtest",
> +            *kernel_args,
> +            "--rootfs",
> +            "/",
> +            "--bpf-obj",
> +            
> str(invalid_memory_access_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
> +        ]
> +        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
> +        output_lines = output.splitlines()
> +        assert code == 1
> +        assert "BPF program failed to load" == output_lines[0]
> +        assert "Verifier logs:" == output_lines[1]
> +
> +    def test_main_with_valid_command(self, kernel_args, capsys):
> +        args = ["main.py", "vmtest", *kernel_args, "--rootfs", "/", "-c", 
> "uname -r"]
> +        code, output, _ = run_main_with_args_and_capture_output(args, capsys)
> +        assert code == 0
> +        assert f"{kernel_args[1]}.0" in output
> +
> +    def test_main_with_invalid_command(self, kernel_args, capsys):
> +        args = [
> +            "main.py",
> +            "vmtest",
> +            *kernel_args,
> +            "--rootfs",
> +            "/",
> +            "-c",
> +            "NotImplemented",
> +        ]
> +        code, output, error = run_main_with_args_and_capture_output(args, 
> capsys)
> +        assert code != 0
> +        assert f"Command failed with exit code: {code}" in output
> +
> +    def test_bpf_compile_subcommand(
> +        self, kernel_args, openat_bpf_source, tmp_path, capsys
> +    ):
> +        args = [
> +            "main.py",
> +            "bpf",
> +            "compile",
> +            *kernel_args,
> +            "-o",
> +            "",
> +            str(openat_bpf_source),
> +        ]
> +        code, _, _ = run_main_with_args_and_capture_output(args, capsys)
> +        assert code == 0
> diff --git a/contrib/bpf-vmtest-tool/utils.py 
> b/contrib/bpf-vmtest-tool/utils.py
> new file mode 100644
> index 00000000000..682840556f1
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/utils.py
> @@ -0,0 +1,31 @@
> +import subprocess
> +import logging
> +from typing import Any
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +def run_command(cmd: list[str], stream_output: bool = False, **kwargs: Any):
> +    cleaned_cmd = [str(item) for item in cmd if str(item).strip()]
> +    capture_cmd_output = not stream_output
> +    try:
> +        logger.debug(f"running command: {cleaned_cmd}")
> +        result = subprocess.run(
> +            cleaned_cmd,
> +            text=True,
> +            check=True,
> +            capture_output=capture_cmd_output,
> +            shell=False,
> +            **kwargs,
> +        )
> +        if capture_cmd_output:
> +            logger.debug("Command stdout:\n" + result.stdout.strip())
> +            if result.stderr:
> +                logger.debug("Command stderr:\n" + result.stderr.strip())
> +        return result
> +    except subprocess.CalledProcessError as e:
> +        logger.error(e)
> +        if capture_cmd_output:
> +            logger.error("Command failed with stdout:\n" + e.stdout.strip())
> +            logger.error("Command failed with stderr:\n" + e.stderr.strip())
> +        raise
> diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py
> new file mode 100644
> index 00000000000..ba3bdecf94b
> --- /dev/null
> +++ b/contrib/bpf-vmtest-tool/vm.py
> @@ -0,0 +1,169 @@
> +import logging
> +import subprocess
> +from typing import List
> +
> +from kernel import KernelImage
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +class VMConfig:
> +    """Configuration container for VM settings"""
> +
> +    def __init__(
> +        self, kernel_image: KernelImage, rootfs_path: str, command: str, 
> **kwargs
> +    ):
> +        self.kernel = kernel_image
> +        self.kernel_path = str(kernel_image.path)
> +        self.rootfs_path = rootfs_path
> +        self.command = command
> +        self.memory_mb = kwargs.get("memory_mb", 512)
> +        self.cpu_count = kwargs.get("cpu_count", 1)
> +        self.extra_args = kwargs.get("extra_args", {})
> +
> +
> +def bpf_verifier_logs(output: str) -> str:
> +    start_tag = "--- Verifier log start ---"
> +    end_tag = "--- Verifier log end ---"
> +
> +    start_idx = output.find(start_tag)
> +    end_idx = output.find(end_tag)
> +
> +    if start_idx != -1 and end_idx != -1:
> +        # Extract between the tags (excluding the markers themselves)
> +        log_body = output[start_idx + len(start_tag) : end_idx].strip()
> +        return log_body
> +    else:
> +        return "No verifier log found in the output."
> +
> +
> +class Vmtest:
> +    """vmtest backend implementation"""
> +
> +    def __init__(self):
> +        pass
> +
> +    def _boot_command(self, vm_config: VMConfig):
> +        vmtest_command = ["vmtest"]
> +        vmtest_command.extend(["-k", vm_config.kernel_path])
> +        vmtest_command.extend(["-r", vm_config.rootfs_path])
> +        vmtest_command.append(vm_config.command)
> +        return vmtest_command
> +
> +    def _remove_boot_log(self, full_output: str) -> str:
> +        """
> +        Filters QEMU and kernel boot logs, returning only the output after 
> the
> +        `===> Running command` marker.
> +        """
> +        marker = "===> Running command"
> +        lines = full_output.splitlines()
> +
> +        try:
> +            start_index = next(i for i, line in enumerate(lines) if marker 
> in line)
> +            # Return everything after that marker (excluding the marker 
> itself)
> +            return "\n".join(lines[start_index + 1 :]).strip()
> +        except StopIteration:
> +            return full_output.strip()
> +
> +    def run_command(self, vm_config):
> +        vm = None
> +        try:
> +            logger.info(f"Booting VM with kernel: {vm_config.kernel_path}")
> +            logger.info(f"Using rootfs: {vm_config.rootfs_path}")
> +            vm = subprocess.run(
> +                self._boot_command(vm_config),
> +                check=True,
> +                text=True,
> +                capture_output=True,
> +                shell=False,
> +            )
> +            vm_stdout = vm.stdout
> +            logger.debug(vm_stdout)
> +            return VMCommandResult(
> +                vm.returncode, self._remove_boot_log(vm_stdout), None
> +            )
> +        except FileNotFoundError:
> +            raise BootFailedError(
> +                "vmtest command not found in PATH. Please ensure vmtest is 
> installed and available in your system PATH."
> +            )
> +        except subprocess.CalledProcessError as e:
> +            out = e.stdout
> +            err = e.stderr
> +            # when the command in the vm fails we consider it as a 
> successful boot
> +            if "===> Running command" not in out:
> +                raise BootFailedError("Boot failed", out, err, e.returncode)
> +            logger.debug("STDOUT: \n%s", out)
> +            logger.debug("STDERR: \n%s", err)
> +            return VMCommandResult(e.returncode, self._remove_boot_log(out), 
> err)
> +
> +
> +class VMCommandResult:
> +    def __init__(self, returncode, stdout, stderr) -> None:
> +        self.returncode = returncode
> +        self.stdout = stdout
> +        self.stderr = stderr
> +
> +
> +class VirtualMachine:
> +    """Main VM class - simple interface for end users"""
> +
> +    # Registry of available hypervisors
> +    _hypervisors = {
> +        "vmtest": Vmtest,
> +    }
> +
> +    def __init__(
> +        self,
> +        kernel_image: KernelImage,
> +        rootfs_path: str,
> +        command: str,
> +        hypervisor_type: str = "vmtest",
> +        **kwargs,
> +    ):
> +        self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs)
> +
> +        if hypervisor_type not in self._hypervisors:
> +            raise ValueError(f"Unsupported hypervisor: {hypervisor_type}")
> +
> +        self.hypervisor = self._hypervisors[hypervisor_type]()
> +
> +    @classmethod
> +    def list_hypervisors(cls) -> List[str]:
> +        """List available hypervisors"""
> +        return list(cls._hypervisors.keys())
> +
> +    def execute(self):
> +        """Execute command in VM"""
> +        return self.hypervisor.run_command(self.config)
> +
> +
> +class BootFailedError(Exception):
> +    """Raised when VM fails to boot properly (before command execution)."""
> +
> +    def __init__(
> +        self, message: str, stdout: str = "", stderr: str = "", returncode: 
> int = -1
> +    ):
> +        super().__init__(message)
> +        self.stdout = stdout
> +        self.stderr = stderr
> +        self.returncode = returncode
> +
> +    def __str__(self):
> +        base = super().__str__()
> +
> +        output_parts = [
> +            base,
> +            f"Return code: {self.returncode}",
> +        ]
> +
> +        optional_sections = [
> +            ("STDOUT", self.stdout),
> +            ("STDERR", self.stderr),
> +        ]
> +
> +        for header, content in optional_sections:
> +            if content:
> +                output_parts.append(f"--- {header} ---")
> +                output_parts.append(content)
> +
> +        return "\n".join(output_parts)

Reply via email to