KellenSunderland closed pull request #12333: WIP: Zombie fix
URL: https://github.com/apache/incubator-mxnet/pull/12333
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ci/build.py b/ci/build.py
index a9d6a63537f..486d2b3beda 100755
--- a/ci/build.py
+++ b/ci/build.py
@@ -23,7 +23,7 @@
"""
__author__ = 'Marco de Abreu, Kellen Sunderland, Anton Chernov, Pedro Larroy'
-__version__ = '0.1'
+__version__ = '0.3'
import argparse
import glob
@@ -40,6 +40,51 @@
from subprocess import call, check_call
from typing import *
from util import *
+import docker
+import docker.models
+import docker.errors
+import signal
+import atexit
+
+
+class Cleanup:
+ """A class to cleanup containers"""
+ def __init__(self):
+ self.containers = set()
+ self.docker_stop_timeout = 3
+
+ def add_container(self, container: docker.models.containers.Container):
+ assert isinstance(container, docker.models.containers.Container)
+ self.containers.add(container)
+
+ def remove_container(self, container: docker.models.containers.Container):
+ assert isinstance(container, docker.models.containers.Container)
+ self.containers.remove(container)
+
+ def _cleanup_containers(self):
+ if self.containers:
+ logging.warning("Cleaning up containers")
+ else:
+ return
+ try:
+ stop_timeout = int(os.environ.get("DOCKER_STOP_TIMEOUT",
self.docker_stop_timeout))
+ except Exception:
+ stop_timeout = 3
+ for container in self.containers:
+ try:
+ container.stop(timeout=stop_timeout)
+ logging.info("☠: stopped container %s",
trim_container_id(container.id))
+ container.remove()
+ logging.info("🚽: removed container %s",
trim_container_id(container.id))
+ except Exception as e:
+ logging.exception(e)
+ #pass
+ self.containers.clear()
+ logging.info("Cleaning up containers finished.")
+
+ def __call__(self):
+ """Perform cleanup"""
+ self._cleanup_containers()
CCACHE_MAXSIZE = '500G'
@@ -91,7 +136,7 @@ def build_docker(platform: str, docker_binary: str,
registry: str, num_retries:
# docker pull see: docker_cache.load_docker_cache
#
# This doesn't work with multi head docker files.
- #
+ #
for i in range(num_retries):
logging.info('%d out of %d tries to build the docker image.', i + 1,
num_retries)
@@ -158,15 +203,36 @@ def default_ccache_dir() -> str:
return ccache_dir
return os.path.join(tempfile.gettempdir(), "ci_ccache")
+def trim_container_id(cid):
+ return cid[:12]
def container_run(platform: str,
- docker_binary: str,
+ nvidia_runtime: bool,
docker_registry: str,
shared_memory_size: str,
local_ccache_dir: str,
command: List[str],
+ cleanup: Cleanup,
dry_run: bool = False,
- interactive: bool = False) -> str:
+ interactive: bool = False) -> int:
+ CONTAINER_WAIT_S = 600
+ #
+ # Environment setup
+ #
+ environment = {
+ 'CCACHE_MAXSIZE': '500G',
+ 'CCACHE_TEMPDIR': '/tmp/ccache', # temp dir should be local and not
shared
+ 'CCACHE_DIR': '/work/ccache', # this path is inside the container as
/work/ccache is mounted
+ 'CCACHE_LOGFILE': '/tmp/ccache.log', # a container-scoped log, useful
for ccache verification.
+ }
+ # These variables are passed to the container to the process tree killer
can find runaway process inside the container
+ # https://wiki.jenkins.io/display/JENKINS/ProcessTreeKiller
+ #
https://github.com/jenkinsci/jenkins/blob/578d6bacb33a5e99f149de504c80275796f0b231/core/src/main/java/hudson/model/Run.java#L2393
+ #
+ JENKINS_ENV_VARS = ['BUILD_NUMBER', 'BUILD_ID', 'BUILD_TAG']
+ environment.update({k: os.environ[k] for k in JENKINS_ENV_VARS if k in
os.environ})
+ environment.update({k: os.environ[k] for k in ['CCACHE_MAXSIZE'] if k in
os.environ})
+
tag = get_docker_tag(platform=platform, registry=docker_registry)
mx_root = get_mxnet_root()
local_build_folder = buildir()
@@ -174,42 +240,100 @@ def container_run(platform: str,
os.makedirs(local_build_folder, exist_ok=True)
os.makedirs(local_ccache_dir, exist_ok=True)
logging.info("Using ccache directory: %s", local_ccache_dir)
- runlist = [docker_binary, 'run', '--rm', '-t',
+ docker_client = docker.from_env()
+ # Equivalent command
+ docker_cmd_list = [get_docker_binary(nvidia_runtime), 'run',
+ '--rm',
'--shm-size={}'.format(shared_memory_size),
'-v', "{}:/work/mxnet".format(mx_root), # mount mxnet root
'-v', "{}:/work/build".format(local_build_folder), # mount
mxnet/build for storing build artifacts
'-v', "{}:/work/ccache".format(local_ccache_dir),
'-u', '{}:{}'.format(os.getuid(), os.getgid()),
- '-e', 'CCACHE_MAXSIZE={}'.format(CCACHE_MAXSIZE),
- '-e', 'CCACHE_TEMPDIR=/tmp/ccache', # temp dir should be local
and not shared
- '-e', "CCACHE_DIR=/work/ccache", # this path is inside the
container as /work/ccache is mounted
- '-e', "CCACHE_LOGFILE=/tmp/ccache.log", # a container-scoped
log, useful for ccache verification.
+ '-e', 'CCACHE_MAXSIZE={}'.format(environment['CCACHE_MAXSIZE']),
+ '-e',
'CCACHE_TEMPDIR={}'.format(environment['CCACHE_TEMPDIR']), # temp dir should
be local and not shared
+ '-e', "CCACHE_DIR={}".format(environment['CCACHE_DIR']), #
this path is inside the container as /work/ccache is mounted
+ '-e',
"CCACHE_LOGFILE={}".format(environment['CCACHE_LOGFILE']), # a
container-scoped log, useful for ccache verification.
+ '-ti',
tag]
- runlist.extend(command)
- cmd = '\\\n\t'.join(runlist)
- ret = 0
- if not dry_run and not interactive:
- logging.info("Running %s in container %s", command, tag)
- logging.info("Executing:\n%s\n", cmd)
- ret = call(runlist)
-
- docker_run_cmd = ' '.join(runlist)
- if not dry_run and interactive:
- into_cmd = deepcopy(runlist)
- # -ti can't be after the tag, as is interpreted as a command so hook
it up after the -u argument
- idx = into_cmd.index('-u') + 2
- into_cmd[idx:idx] = ['-ti']
- cmd = '\\\n\t'.join(into_cmd)
- logging.info("Executing:\n%s\n", cmd)
- docker_run_cmd = ' '.join(into_cmd)
- ret = call(into_cmd)
-
- if not dry_run and not interactive and ret != 0:
- logging.error("Running of command in container failed (%s):\n%s\n",
ret, cmd)
- logging.error("You can get into the container by adding the -i option")
- raise subprocess.CalledProcessError(ret, cmd)
-
- return docker_run_cmd
+ docker_cmd_list.extend(command)
+ docker_cmd = ' \\\n\t'.join(docker_cmd_list)
+ logging.info("Running %s in container %s", command, tag)
+ logging.info("Executing the equivalent of:\n%s\n", docker_cmd)
+ ret = 0 # return code of the command inside docker
+ if not dry_run:
+
+
+ #############################
+ #
+ signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT,
signal.SIGTERM})
+ if nvidia_runtime:
+ runtime='nvidia'
+ else:
+ # runc is default (docker info | grep -i runtime)
+ runtime=None
+
+ container = docker_client.containers.run(
+ tag,
+ runtime=runtime,
+ detach=True,
+ command=command,
+ #auto_remove=True,
+ shm_size=shared_memory_size,
+ user='{}:{}'.format(os.getuid(), os.getgid()),
+ volumes={
+ mx_root:
+ {'bind': '/work/mxnet', 'mode': 'rw'},
+ local_build_folder:
+ {'bind': '/work/build', 'mode': 'rw'},
+ local_ccache_dir:
+ {'bind': '/work/ccache', 'mode': 'rw'},
+ },
+ environment=environment)
+ logging.info("Started container: %s", trim_container_id(container.id))
+ # Race condition:
+ # If the previous call is interrupted then it's possible that the
container is not cleaned up
+ # We avoid by masking the signals temporarily
+ cleanup.add_container(container)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT,
signal.SIGTERM})
+ #
+ #############################
+
+ stream = container.logs(stream=True, stdout=True, stderr=True)
+ sys.stdout.flush()
+ for chunk in stream:
+ sys.stdout.buffer.write(chunk)
+ sys.stdout.buffer.flush()
+ sys.stdout.flush()
+ stream.close()
+ try:
+ logging.info("Waiting for status of container %s for %d s.",
trim_container_id(container.id), CONTAINER_WAIT_S)
+ wait_result = container.wait(timeout=CONTAINER_WAIT_S)
+ logging.info("Container exit status: %s", wait_result)
+ ret = wait_result.get('StatusCode', 200)
+ except Exception as e:
+ logging.exception(e)
+ ret = 150
+
+ # Stop
+ try:
+ logging.info("Stopping container: %s",
trim_container_id(container.id))
+ container.stop()
+ except Exception as e:
+ logging.exception(e)
+ ret = 151
+
+ # Remove
+ try:
+ logging.info("Removing container: %s",
trim_container_id(container.id))
+ container.remove()
+ except Exception as e:
+ logging.exception(e)
+ ret = 152
+ cleanup.remove_container(container)
+ containers = docker_client.containers.list()
+ if containers:
+ logging.info("Other running containers: %s",
[trim_container_id(x.id) for x in containers])
+ return ret
def list_platforms() -> str:
@@ -305,6 +429,19 @@ def use_cache():
shared_memory_size = args.shared_memory_size
num_docker_build_retires = args.docker_build_retries
+ # Cleanup on signals and exit
+ cleanup = Cleanup()
+ def signal_handler(signum, _):
+ signal.pthread_sigmask(signal.SIG_BLOCK, {signum})
+ logging.warning("Signal %d received, cleaning up...", signum)
+ cleanup()
+ logging.warning("done. Exiting with error.")
+ sys.exit(1)
+
+ atexit.register(cleanup)
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+
if args.list:
list_platforms()
elif args.platform:
@@ -317,26 +454,32 @@ def use_cache():
logging.warning("Container was just built. Exiting due to
build-only.")
return 0
+ ret = 0
if command:
- container_run(platform=platform, docker_binary=docker_binary,
shared_memory_size=shared_memory_size,
- command=command,
docker_registry=args.docker_registry,
- local_ccache_dir=args.ccache_dir,
interactive=args.interactive)
+ ret = container_run(platform=platform,
nvidia_runtime=args.nvidiadocker,
+ shared_memory_size=args.shared_memory_size, command=command,
docker_registry=args.docker_registry,
+ local_ccache_dir=args.ccache_dir,
interactive=args.interactive, cleanup=cleanup)
elif args.print_docker_run:
- print(container_run(platform=platform,
docker_binary=docker_binary, shared_memory_size=shared_memory_size,
- command=[], dry_run=True,
docker_registry=args.docker_registry, local_ccache_dir=args.ccache_dir))
+ ret = container_run(platform=platform,
nvidia_runtime=args.nvidiadocker,
+ shared_memory_size=args.shared_memory_size, command=[],
dry_run=True, docker_registry=args.docker_registry,
+ local_ccache_dir=args.ccache_dir)
+ command=[]
elif args.interactive:
- container_run(platform=platform, docker_binary=docker_binary,
shared_memory_size=shared_memory_size,
+ ret = container_run(platform=platform,
nvidia_runtime=args.nvidiadocker, shared_memory_size=shared_memory_size,
command=command,
docker_registry=args.docker_registry,
- local_ccache_dir=args.ccache_dir,
interactive=args.interactive)
-
+ local_ccache_dir=args.ccache_dir,
interactive=args.interactive, cleanup=cleanup)
else:
# With no commands, execute a build function for the target
platform
assert not args.interactive, "when running with -i must provide a
command"
- cmd = ["/work/mxnet/ci/docker/runtime_functions.sh",
"build_{}".format(platform)]
- logging.info("No command specified, trying default build: %s", '
'.join(cmd))
- container_run(platform=platform, docker_binary=docker_binary,
shared_memory_size=shared_memory_size,
- command=cmd, docker_registry=args.docker_registry,
- local_ccache_dir=args.ccache_dir)
+
+ command = ["/work/mxnet/ci/docker/runtime_functions.sh",
"build_{}".format(platform)]
+ logging.info("No command specified, trying default build: %s", '
'.join(command))
+ ret = container_run(platform=platform,
nvidia_runtime=args.nvidiadocker, shared_memory_size=args.shared_memory_size,
+ command=command, docker_registry=args.docker_registry,
local_ccache_dir=args.ccache_dir, cleanup=cleanup)
+
+ if ret != 0:
+ logging.critical("Execution of %s failed with status: %d",
command, ret)
+ return(ret)
elif args.all:
platforms = get_platforms()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services