areusch commented on a change in pull request #8380:
URL: https://github.com/apache/tvm/pull/8380#discussion_r677889527
##########
File path: apps/microtvm/zephyr/template_project/microtvm_api_server.py
##########
@@ -0,0 +1,533 @@
+import collections
+import fcntl
+import logging
+import os
+import os.path
+import pathlib
+import re
+import select
+import shlex
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+
+import serial
+import serial.tools.list_ports
+import yaml
+
+from tvm.micro.project_api import server
+
+
+_LOG = logging.getLogger(__name__)
+
+
+API_SERVER_DIR = pathlib.Path(os.path.dirname(__file__) or os.path.getcwd())
+
+
+BUILD_DIR = API_SERVER_DIR / "build"
+
+
+MODEL_LIBRARY_FORMAT_RELPATH = "model.tar"
+
+
+IS_TEMPLATE = not (API_SERVER_DIR / MODEL_LIBRARY_FORMAT_RELPATH).exists()
+
+
+def check_call(cmd_args, *args, **kwargs):
+ cwd_str = '' if 'cwd' not in kwargs else f" (in cwd: {kwargs['cwd']})"
+ _LOG.info("run%s: %s", cwd_str, " ".join(shlex.quote(a) for a in cmd_args))
+ return subprocess.check_call(cmd_args, *args, **kwargs)
+
+
+CACHE_ENTRY_RE = re.compile(r"(?P<name>[^:]+):(?P<type>[^=]+)=(?P<value>.*)")
+
+
+CMAKE_BOOL_MAP = dict(
+ [(k, True) for k in ("1", "ON", "YES", "TRUE", "Y")]
+ + [(k, False) for k in ("0", "OFF", "NO", "FALSE", "N", "IGNORE",
"NOTFOUND", "")]
+)
+
+
+class CMakeCache(collections.Mapping):
+
+ def __init__(self, path):
+ self._path = path
+ self._dict = None
+
+ def __iter__(self):
+ return iter(self._dict)
+
+ def __getitem__(self, key):
+ if self._dict is None:
+ self._dict = self._read_cmake_cache()
+
+ return self._dict[key]
+
+ def __len__(self):
+ return len(self._dict)
+
+ def _read_cmake_cache(self):
+ """Read a CMakeCache.txt-like file and return a dictionary of
values."""
+ entries = collections.OrderedDict()
+ with open(self._path, encoding="utf-8") as f:
+ for line in f:
+ m = CACHE_ENTRY_RE.match(line.rstrip("\n"))
+ if not m:
+ continue
+
+ if m.group("type") == "BOOL":
+ value = CMAKE_BOOL_MAP[m.group("value").upper()]
+ else:
+ value = m.group("value")
+
+ entries[m.group("name")] = value
+
+ return entries
+
+
+CMAKE_CACHE = CMakeCache(BUILD_DIR / "CMakeCache.txt")
+
+
+class BoardError(Exception):
+ """Raised when an attached board cannot be opened (i.e. missing /dev
nodes, etc)."""
+
+
+class BoardAutodetectFailed(Exception):
+ """Raised when no attached hardware is found matching the board= given to
ZephyrCompiler."""
+
+
+def _get_flash_runner():
+ flash_runner = CMAKE_CACHE.get("ZEPHYR_BOARD_FLASH_RUNNER")
+ if flash_runner is not None:
+ return flash_runner
+
+ with open(CMAKE_CACHE["ZEPHYR_RUNNERS_YAML"]) as f:
+ doc = yaml.load(f, Loader=yaml.FullLoader)
+ return doc["flash-runner"]
+
+
+def _get_device_args(options, cmake_entries):
+ flash_runner = _get_flash_runner()
+
+ if flash_runner == "nrfjprog":
+ return _get_nrf_device_args(options)
+
+ if flash_runner == "openocd":
+ return _get_openocd_device_args(options)
+
+ raise BoardError(
+ f"Don't know how to find serial terminal for board
{CMAKE_CACHE['BOARD']} with flash "
+ f"runner {flash_runner}")
+
+# kwargs passed to usb.core.find to find attached boards for the openocd flash
runner.
+BOARD_USB_FIND_KW = {
+ "nucleo_f746zg": {"idVendor": 0x0483, "idProduct": 0x374B},
+ "stm32f746g_disco": {"idVendor": 0x0483, "idProduct": 0x374B},
+}
+
+def openocd_serial(options):
+ """Find the serial port to use for a board with OpenOCD flash strategy."""
+ if "openocd_serial" in options:
+ return options["openocd_serial"]
+
+ import usb # pylint: disable=import-outside-toplevel
+
+ find_kw = BOARD_USB_FIND_KW[CMAKE_CACHE["BOARD"]]
+ boards = usb.core.find(find_all=True, **find_kw)
+ serials = []
+ for b in boards:
+ serials.append(b.serial_number)
+
+ if len(serials) == 0:
+ raise BoardAutodetectFailed(f"No attached USB devices matching:
{find_kw!r}")
+ serials.sort()
+
+ autodetected_openocd_serial = serials[0]
+ _LOG.debug("zephyr openocd driver: autodetected serial %s", serials[0])
+
+ return autodetected_openocd_serial
+
+
+def _get_openocd_device_args(options):
+ return ["--serial", openocd_serial(options)]
+
+
+def _get_nrf_device_args(options):
+ nrfjprog_args = ["nrfjprog", "--ids"]
+ nrfjprog_ids = subprocess.check_output(nrfjprog_args, encoding="utf-8")
+ if not nrfjprog_ids.strip("\n"):
+ raise BoardAutodetectFailed(
+ f'No attached boards recognized by {" ".join(nrfjprog_args)}'
+ )
+
+ boards = nrfjprog_ids.split("\n")[:-1]
+ if len(boards) > 1:
+ if options['nrfjprog_snr'] is None:
+ raise BoardError(
+ "Multiple boards connected; specify one with nrfjprog_snr=: "
+ f'{", ".join(boards)}'
+ )
+
+ if str(options['nrfjprog_snr']) not in boards:
+ raise BoardError(
+ f"nrfjprog_snr ({options['nrfjprog_snr']}) not found in
{nrfjprog_args}: {boards}"
+ )
+
+ return ["--snr", options['nrfjprog_snr']]
+
+ if not boards:
+ return []
+
+ return ["--snr", boards[0]]
+
+
+PROJECT_OPTIONS = [
+ server.ProjectOption("gdbserver_port", help=("If given, port number to use
when running the "
+ "local gdbserver")),
+ server.ProjectOption("openocd_serial", help=("When used with OpenOCD
targets, serial # of the "
+ "attached board to use")),
+ server.ProjectOption("nrfjprog_snr", help=("When used with nRF targets,
serial # of the "
+ "attached board to use, from
nrfjprog")),
+ server.ProjectOption("verbose", help="Run build with verbose output"),
+ server.ProjectOption("west_cmd",
+ help=("Path to the west tool. If given, supersedes
both the zephyr_base "
+ "option and ZEPHYR_BASE environment
variable.")),
+ server.ProjectOption("zephyr_base",
+ help="Path to the zephyr base directory."),
+ server.ProjectOption("zephyr_board", help="Name of the Zephyr board to
build for"),
+]
+
+
+class Handler(server.ProjectAPIHandler):
+
+ def __init__(self):
+ super(Handler, self).__init__()
+ self._proc = None
+
+ def server_info_query(self):
+ return server.ServerInfo(
+ platform_name="zephyr",
+ is_template=IS_TEMPLATE,
+ model_library_format_path="" if IS_TEMPLATE else (API_SERVER_DIR /
MODEL_LIBRARY_FORMAT_RELPATH),
+ project_options=PROJECT_OPTIONS)
+
+ # These files and directories will be recursively copied into generated
projects from the CRT.
+ CRT_COPY_ITEMS = ("include", "Makefile", "src")
+
+ def generate_project(self, model_library_format_path, standalone_crt_dir,
project_dir, options):
+ project_dir = pathlib.Path(project_dir)
+ # Make project directory.
+ project_dir.mkdir()
+
+ # Copy ourselves to the generated project. TVM may perform further
build steps on the generated project
+ # by launching the copy.
+ shutil.copy2(__file__, project_dir / os.path.basename(__file__))
+
+ # Place Model Library Format tarball in the special location, which
this script uses to decide
+ # whether it's being invoked in a template or generated project.
+ project_model_library_format_tar_path = project_dir /
MODEL_LIBRARY_FORMAT_RELPATH
+ shutil.copy2(model_library_format_path,
project_model_library_format_tar_path)
+
+ shutil.copytree(API_SERVER_DIR / "boards", project_dir / "boards")
+
+ # Extract Model Library Format tarball.into <project_dir>/model.
+ extract_path =
os.path.splitext(project_model_library_format_tar_path)[0]
+ with tarfile.TarFile(project_model_library_format_tar_path) as tf:
+ os.makedirs(extract_path)
+ tf.extractall(path=extract_path)
+
+ if self._is_qemu(options):
+ shutil.copytree(API_SERVER_DIR / "qemu-hack", project_dir /
"qemu-hack")
+
+ # Populate CRT.
+ crt_path = project_dir / "crt"
+ crt_path.mkdir()
+ for item in self.CRT_COPY_ITEMS:
+ src_path = os.path.join(standalone_crt_dir, item)
+ dst_path = crt_path / item
+ if os.path.isdir(src_path):
+ shutil.copytree(src_path, dst_path)
+ else:
+ shutil.copy2(src_path, dst_path)
+
+ # Populate Makefile.
+ shutil.copy2(API_SERVER_DIR / "CMakeLists.txt", project_dir /
"CMakeLists.txt")
+ shutil.copy2(API_SERVER_DIR / "prj.conf", project_dir / "prj.conf")
+
+ # Populate crt-config.h
+ crt_config_dir = project_dir / "crt_config"
+ crt_config_dir.mkdir()
+ shutil.copy2(API_SERVER_DIR / "crt_config" / "crt_config.h",
crt_config_dir / "crt_config.h")
+
+ # Populate src/
+ src_dir = project_dir / "src"
+ src_dir.mkdir()
+ shutil.copy2(API_SERVER_DIR / "src" / "main.c", src_dir / "main.c")
+
+ def build(self, options):
+ BUILD_DIR.mkdir()
+
+ cmake_args = ["cmake", ".."]
+ if options.get("verbose"):
+ cmake_args.append("-DCMAKE_VERBOSE_MAKEFILE:BOOL=TRUE")
+
+ if options.get("zephyr_base"):
+ cmake_args.append(f"-DZEPHYR_BASE:STRING={options['zephyr_base']}")
+
+ cmake_args.append(f"-DBOARD:STRING={options['zephyr_board']}")
+
+ check_call(cmake_args, cwd=BUILD_DIR)
+
+ args = ["make", "-j2"]
+ if options.get("verbose"):
+ args.append("VERBOSE=1")
+ check_call(args, cwd=BUILD_DIR)
+
+ @classmethod
+ def _is_qemu(cls, options):
+ return "qemu" in options["zephyr_board"]
+
+ def flash(self, options):
+ if self._is_qemu(options):
+ return # NOTE: qemu requires no flash step--it is launched from
open_transport.
+
+ zephyr_board = options["zephyr_board"]
+
+ # The nRF5340DK requires an additional `nrfjprog --recover` before
each flash cycle.
+ # This is because readback protection is enabled by default when this
device is flashed.
+ # Otherwise, flashing may fail with an error such as the following:
+ # ERROR: The operation attempted is unavailable due to readback
protection in
+ # ERROR: your device. Please use --recover to unlock the device.
+ if zephyr_board.startswith("nrf5340dk") and _get_flash_runner() ==
"nrfjprog":
+ recover_args = ["nrfjprog", "--recover"]
+ recover_args.extend(_get_nrf_device_args(options))
+ self._subprocess_env.run(recover_args, cwd=build_dir)
Review comment:
resolved these problems!
##########
File path: python/tvm/micro/compiler.py
##########
@@ -1,361 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
Review comment:
@mehrdadh any thoughts here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]