larroy commented on a change in pull request #10917: [MXNET-416] Add docker 
cache
URL: https://github.com/apache/incubator-mxnet/pull/10917#discussion_r187911986
 
 

 ##########
 File path: ci/docker_cache.py
 ##########
 @@ -0,0 +1,267 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Utility to handle distributed docker cache
+"""
+
+import os
+import logging
+import argparse
+import sys
+import boto3
+import tempfile
+import pprint
+import threading
+import build as build_util
+import botocore
+import subprocess
+from botocore.handlers import disable_signing
+from subprocess import call, check_call, CalledProcessError
+from joblib import Parallel, delayed
+
+S3_METADATA_IMAGE_ID_KEY = 'docker-image-id'
+LOG_PROGRESS_PERCENTAGE_THRESHOLD = 10
+
+cached_aws_session = None
+
+
+class ProgressPercentage(object):
+    def __init__(self, object_name, size):
+        self._object_name = object_name
+        self._size = size
+        self._seen_so_far = 0
+        self._last_percentage = 0
+        self._lock = threading.Lock()
+
+    def __call__(self, bytes_amount):
+        # To simplify we'll assume this is hooked up
+        # to a single filename.
+        with self._lock:
+            self._seen_so_far += bytes_amount
+            percentage = int((self._seen_so_far / self._size) * 100)
+            if (percentage - self._last_percentage) >= 
LOG_PROGRESS_PERCENTAGE_THRESHOLD:
+                self._last_percentage = percentage
+                logging.info('{}% of {}'.format(percentage, self._object_name))
+
+
+def build_save_containers(platforms, bucket):
+    if len(platforms) == 0:
+        return
+
+    platform_results = Parallel(n_jobs=len(platforms), 
backend="multiprocessing")(
+        delayed(_build_save_container)(platform, bucket)
+        for platform in platforms)
+
+    is_error = False
+    for platform_result in platform_results:
+        if platform_result is not None:
+            logging.error('Failed to generate {}'.format(platform_result))
+            is_error = True
+
+    return 1 if is_error else 0
+
+
+def _build_save_container(platform, bucket):
+    """
+    Build image for passed platform and upload the cache to the specified S3 
bucket
+    :param platform: Platform
+    :param bucket: Target s3 bucket
+    :return: Platform if failed, None otherwise
+    """
+    docker_tag = build_util.get_docker_tag(platform)
+
+    # Preload cache
+    # TODO: Allow to disable this in order to allow clean rebuilds
+    load_docker_cache(bucket_name=bucket, docker_tag=docker_tag)
+
+    # Start building
+    logging.debug('Building {} as {}'.format(platform, docker_tag))
+    try:
+        image_id = build_util.build_docker(docker_binary='docker', 
platform=platform)
+        logging.info('Built {} as {}'.format(docker_tag, image_id))
+
+        # Compile and upload tarfile
+        _compile_upload_cache_file(bucket_name=bucket, docker_tag=docker_tag, 
image_id=image_id)
+        return None
+    except Exception:
+        logging.exception('Unexpected exception during build of 
{}'.format(docker_tag))
+        return platform
+        # Error handling is done by returning the errorous platform name. This 
is necessary due to
+        # Parallel being unable to handle exceptions
+
+
+def _compile_upload_cache_file(bucket_name, docker_tag, image_id):
+    session = _get_aws_session()
+    s3_object = session.resource('s3').Object(bucket_name, docker_tag)
+
+    remote_image_id = _get_remote_image_id(s3_object)
+    if remote_image_id == image_id:
+        logging.info('{} ({}) for {} has not been updated - 
skipping'.format(docker_tag, image_id, docker_tag))
+        return
+    else:
+        logging.debug('Cached image {} differs from local {} for 
{}'.format(remote_image_id, image_id, docker_tag))
+
+    # Compile layers into tarfile
+    with tempfile.TemporaryDirectory() as temp_dir:
+        tar_file_path = _format_docker_cache_filepath(output_dir=temp_dir, 
docker_tag=docker_tag)
+        logging.debug('Writing layers of {} to {}'.format(docker_tag, 
tar_file_path))
+        history_cmd = ['docker', 'history', '-q', docker_tag]
+
+        image_ids_b = subprocess.check_output(history_cmd)
+        image_ids_str = image_ids_b.decode('utf-8').strip()
+        layer_ids = [id.strip() for id in image_ids_str.split('\n') if id != 
'<missing>']
+
+        # docker_tag is important to preserve the image name. Otherwise, the 
--cache-from feature will not be able to
+        # reference the loaded cache later on. The other layer ids are added 
to ensure all intermediary layers
+        # are preserved to allow resuming the cache at any point
+        cmd = ['docker', 'save', '-o', tar_file_path, docker_tag]
+        cmd.extend(layer_ids)
+        try:
+            check_call(cmd)
+        except CalledProcessError as e:
+            logging.error('Error during save of {} at {}. Command: {}'.
+                          format(docker_tag, tar_file_path, 
pprint.pprint(cmd)))
+            return
+
+        # Upload file
+        logging.info('Uploading {} to S3'.format(docker_tag))
+        with open(tar_file_path, 'rb') as data:
+            s3_object.upload_fileobj(
+                Fileobj=data,
+                Callback=ProgressPercentage(object_name=docker_tag, 
size=os.path.getsize(tar_file_path)),
+                ExtraArgs={"Metadata": {S3_METADATA_IMAGE_ID_KEY: image_id}})
+            logging.info('Uploaded {} to S3'.format(docker_tag))
+
+def _get_remote_image_id(s3_object):
+    """
+    Get the image id of the docker cache which is represented by the S3 object
+    :param s3_object: S3 object
+    :return: Image id as string or None if object does not exist
+    """
+    try:
+        if S3_METADATA_IMAGE_ID_KEY in s3_object.metadata:
+            cached_image_id = s3_object.metadata[S3_METADATA_IMAGE_ID_KEY]
+            return cached_image_id
+        else:
+            logging.debug('No cached image available for 
{}'.format(s3_object.key))
+    except botocore.exceptions.ClientError as e:
+        if e.response['Error']['Code'] == "404":
+            logging.debug('{} does not exist in S3 yet'.format(s3_object.key))
+        else:
+            raise
+
+    return None
+
+
+def load_docker_cache(bucket_name, docker_tag):
 
 Review comment:
   missing return type

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to