Repository: incubator-slider Updated Branches: refs/heads/develop 134ef53f9 -> 5bf14692d
SLIDER-1174 Support Tensorflow on Slider (Yang Wang aka. fly_in_gis via gourksaha) Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5bf14692 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5bf14692 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5bf14692 Branch: refs/heads/develop Commit: 5bf14692d83705392de594e9a199ec0e632fc0d2 Parents: 134ef53 Author: Gour Saha <gourks...@apache.org> Authored: Mon Mar 6 19:13:56 2017 -0800 Committer: Gour Saha <gourks...@apache.org> Committed: Mon Mar 6 19:13:56 2017 -0800 ---------------------------------------------------------------------- app-packages/tensorflow/README.md | 56 +++++++ app-packages/tensorflow/appConfig.default.json | 18 +++ app-packages/tensorflow/metainfo.json | 85 ++++++++++ .../tensorflow/package/files/yarn_bootstrap.py | 95 +++++++++++ app-packages/tensorflow/package/files/ymnist.py | 88 ++++++++++ .../tensorflow/package/scripts/functions.py | 160 +++++++++++++++++++ .../tensorflow/package/scripts/params.py | 54 +++++++ .../tensorflow/package/scripts/tensorflow.py | 51 ++++++ .../package/scripts/tensorflow_service.py | 109 +++++++++++++ app-packages/tensorflow/resources.default.json | 38 +++++ .../tensorflow/ytensorflow/config.default.json | 29 ++++ app-packages/tensorflow/ytensorflow/ytensorflow | 65 ++++++++ .../tensorflow/ytensorflow/ytensorflow.py | 108 +++++++++++++ 13 files changed, 956 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/README.md ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/README.md b/app-packages/tensorflow/README.md new file mode 100644 index 0000000..b6dc8f5 --- /dev/null +++ b/app-packages/tensorflow/README.md @@ -0,0 +1,56 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Environments +* Redhat 7 +* Linux Kernel 3.10.0 +* Docker 1.12.3 +* Hadoop 2.6.5 +* Docker image ytensorflow:0.2.1, [Dockerfile](https://github.com/tensorflow/ecosystem/blob/master/docker/Dockerfile.hdfs) + +# Use slider to run a tensorflow cluster +1. Make sure slider could work well, see the [Slider Start](https://slider.incubator.apache.org/docs/getting_started.html) +2. Download app-packages/tensorflow to $SLIDER_HOME/app-packages/tensorflow +3. Put your tensorflow scripts under app-packages/tensorflow/package/files +4. Set "site.global.hadoop.conf", "site.global.user.scripts.entry", "site.global.user.data.dir", "site.global.user.checkpoint.dir" according to your situation in "appConfig.default.json" +5. Set resource in resources.default.json if you need +6. As is often the case, there is no need to update metainfo.json +7. Start your tensorflow cluster +``` +cd $SLIDER_HOME/app-packages/tensorflow +slider create [app-name] --appdef . --template appConfig.default.json --resources resources.default.json +``` + +# Use ytensorflow to run a tensorflow cluster +## Introduction +ytensorflow(tensorflow on YARN admin client), is used to submit and manage tensorflow cluster on YARN. It aims to make submit more easier. +## Command +``` +ytensorflow cluster -start ./config.json -files ./mnist.py +ytensorflow cluster -stop <appName> +ytensorflow cluster -status <appName> +ytensorflow version +``` + +# User scripts requirements +The following arguments will be generated by the framework and passed to user script. You should use them in the right positon, just as the "mnist.py" +* job_name, worker or ps +* task_index +* ps_hosts +* worker_hosts +* data_dir, directory where the user data is stored +* ckp_dir, directory for storing the checkpoints http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/appConfig.default.json ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/appConfig.default.json b/app-packages/tensorflow/appConfig.default.json new file mode 100644 index 0000000..5482254 --- /dev/null +++ b/app-packages/tensorflow/appConfig.default.json @@ -0,0 +1,18 @@ +{ + "schema": "http://example.org/specification/v2.0.0", + "metadata": { + }, + "global": { + "site.global.hadoop.conf": "/etc/hadoop/conf", + "site.global.yarn.cgroup.root": "/hadoop-yarn", + "site.global.user.scripts.entry": "ymnist.py", + "site.global.user.checkpoint.prefix": "hdfs://hdpdev/user/${USER_NAME}/.slider/cluster", + "site.global.user.name": "${USER_NAME}", + "site.global.zookeeper.quorum": "${ZK_HOST}", + "site.global.docker.image": "ytensorflow:0.2.1", + "site.global.ps.port": "${ps.ALLOCATED_PORT}{PER_CONTAINER}", + "site.global.chiefworker.port": "${chiefworker.ALLOCATED_PORT}{PER_CONTAINER}", + "site.global.worker.port": "${worker.ALLOCATED_PORT}{PER_CONTAINER}", + "site.global.tensorboard.port": "${tensorboard.ALLOCATED_PORT}{PER_CONTAINER}" + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/metainfo.json ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/metainfo.json b/app-packages/tensorflow/metainfo.json new file mode 100644 index 0000000..631146b --- /dev/null +++ b/app-packages/tensorflow/metainfo.json @@ -0,0 +1,85 @@ +{ + "schemaVersion": "2.1", + "application": { + "name": "tensorflow", + "version": "0.1.1", + "exportGroups": [ + { + "name": "ps", + "exports": [ + { + "name": "host_port", + "value": "${ps_HOST}:${site.global.ps.port}" + } + ] + }, + { + "name": "chiefworker", + "exports": [ + { + "name": "host_port", + "value": "${chiefworker_HOST}:${site.global.chiefworker.port}" + } + ] + }, + { + "name": "worker", + "exports": [ + { + "name": "host_port", + "value": "${worker_HOST}:${site.global.worker.port}" + } + ] + }, + { + "name": "tensorboard", + "exports": [ + { + "name": "url", + "value": "http://${tensorboard_HOST}:${site.global.tensorboard.port}" + } + ] + } + ], + "components": [ + { + "name": "ps", + "compExports": "ps-host_port", + "commandScript": { + "script": "scripts/tensorflow.py", + "scriptType": "PYTHON" + } + }, + { + "name": "chiefworker", + "compExports": "chiefworker-host_port", + "commandScript": { + "script": "scripts/tensorflow.py", + "scriptType": "PYTHON" + } + }, + { + "name": "worker", + "compExports": "worker-host_port", + "commandScript": { + "script": "scripts/tensorflow.py", + "scriptType": "PYTHON" + } + }, + { + "name": "tensorboard", + "compExports": "tensorboard-url", + "commandScript": { + "script": "scripts/tensorflow.py", + "scriptType": "PYTHON" + } + } + ], + "packages": [ + { + "type": "folder", + "name": "files" + } + ] + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/files/yarn_bootstrap.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/files/yarn_bootstrap.py b/app-packages/tensorflow/package/files/yarn_bootstrap.py new file mode 100644 index 0000000..8587049 --- /dev/null +++ b/app-packages/tensorflow/package/files/yarn_bootstrap.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time +from abc import abstractmethod, ABCMeta +import tensorflow as tf + +flags = tf.app.flags +# Flags for configuring the task +flags.DEFINE_string("job_name", None, "job name: worker or ps") +flags.DEFINE_integer("task_index", 0, "Worker task index, should be >= 0") +flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") +flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") +flags.DEFINE_string("ckp_dir", None, "Directory for storing the checkpoints") +flags.DEFINE_string("work_dir", "/tmp/tf_on_yarn", "Work directory") + +FLAGS = flags.FLAGS + +class YarnBootstrap(object): + def __init__(self): + pass + + __metaclass__ = ABCMeta + + @abstractmethod + def worker_do(self, server, cluster_spec, task_id): + pass + + @abstractmethod + def ps_do(self, server, cluster_spec, task_id): + pass + + def device_and_server(self): + # If FLAGS.job_name is not set, we're running single-machine TensorFlow. + # Don't set a device. + if FLAGS.job_name is None: + print("Running single-machine training") + return (None, "", "") + + # Otherwise we're running distributed TensorFlow. + print("Running distributed training") + if FLAGS.task_index is None or FLAGS.task_index == "": + raise ValueError("Must specify an explicit `task_index`") + if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "": + raise ValueError("Must specify an explicit `ps_hosts`") + if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "": + raise ValueError("Must specify an explicit `worker_hosts`") + + cluster_spec = tf.train.ClusterSpec({ + "ps": FLAGS.ps_hosts.split(","), + "worker": FLAGS.worker_hosts.split(","), + }) + server = tf.train.Server( + cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index) + time.sleep(60) + if FLAGS.job_name == "ps": + self.ps_do(server, cluster_spec, FLAGS.task_index) + server.join() + + worker_device = "/job:worker/task:{}".format(FLAGS.task_index) + return ( + tf.train.replica_device_setter( + worker_device=worker_device, + cluster=cluster_spec), + server, cluster_spec + ) + + def start(self, unused_args): + if FLAGS.ckp_dir is None or FLAGS.ckp_dir == "": + raise ValueError("Must specify an explicit `ckp_dir`") + device, server, cluster_spec = self.device_and_server() + with tf.device(device): + self.worker_do(server, cluster_spec, FLAGS.task_index) + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/files/ymnist.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/files/ymnist.py b/app-packages/tensorflow/package/files/ymnist.py new file mode 100644 index 0000000..e938b05 --- /dev/null +++ b/app-packages/tensorflow/package/files/ymnist.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from __future__ import division +from __future__ import print_function + +import os + +import tensorflow as tf +from tensorflow.examples.tutorials.mnist import mnist +from tensorflow.python.training import training_util +from yarn_bootstrap import * + +FLAGS = tf.app.flags.FLAGS + +class Ymnist(YarnBootstrap): + def worker_do(self, server, cluster_spec, task_id): + print("Checkpoint dir: " + FLAGS.ckp_dir) + images, labels = self.inputs(100) + logits = mnist.inference(images, 128, 128) + loss = mnist.loss(logits, labels) + train_op = mnist.training(loss, 0.01) + target = "" if server == "" else server.target + with tf.train.MonitoredTrainingSession( + master=target, + is_chief=(task_id == 0), + checkpoint_dir=FLAGS.ckp_dir) as sess: + step = 0 + while not sess.should_stop() and step < 1000000: + sess.run(train_op) + step = training_util.global_step(sess, training_util.get_global_step(sess.graph)) + print("Global step " + str(step)) + + def ps_do(self, server, cluster_spec, task_id): + print("Starting ps " + str(task_id)) + + def read_and_decode(self, filename_queue): + reader = tf.TFRecordReader() + _, serialized_example = reader.read(filename_queue) + features = tf.parse_single_example( + serialized_example, + # Defaults are not specified since both keys are required. + features={ + 'image_raw': tf.FixedLenFeature([], tf.string), + 'label': tf.FixedLenFeature([], tf.int64), + }) + image = tf.decode_raw(features['image_raw'], tf.uint8) + image.set_shape([mnist.IMAGE_PIXELS]) + # Convert from [0, 255] -> [-0.5, 0.5] floats. + image = tf.cast(image, tf.float32) * (1. / 255) - 0.5 + # Convert label from a scalar uint8 tensor to an int32 scalar. + label = tf.cast(features['label'], tf.int32) + return image, label + + def inputs(self, batch_size): + filename = os.path.join("hdfs://hdpdev/user/danrtsey.wy/mnist-data", "train.tfrecords") + with tf.name_scope('input'): + filename_queue = tf.train.string_input_producer([filename]) + image, label = self.read_and_decode(filename_queue) + images, sparse_labels = tf.train.shuffle_batch( + [image, label], batch_size=batch_size, num_threads=2, + capacity=1000 + 3 * batch_size, + # Ensures a minimum amount of shuffling of examples. + min_after_dequeue=1000) + return images, sparse_labels + +def main(unused_argv): + Ymnist().start(unused_args=unused_argv) + +if __name__ == "__main__": + tf.app.run() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/functions.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/scripts/functions.py b/app-packages/tensorflow/package/scripts/functions.py new file mode 100644 index 0000000..60fdbd4 --- /dev/null +++ b/app-packages/tensorflow/package/scripts/functions.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import json +import urllib2 +import socket +from kazoo.client import KazooClient +from resource_management import * + + +def get_am_rest_base(): + am_rest_base = "" + zk = KazooClient(hosts=format("{registry_zk}")) + zk.start() + path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}") + data, stat = zk.get(path) + app_registry = json.loads(data) + for item in app_registry['external']: + if item['api'] == 'classpath:org.apache.slider.client.rest': + am_rest_base = item['addresses'][0]['uri'] + return am_rest_base + + +def get_allocated_resources(): + resources_rest_url = get_am_rest_base() + '/ws/v1/slider/application/live/resources' + resources = json.loads(urllib2.urlopen(resources_rest_url).read()) + mem_ps = int(resources['components']['ps']['yarn.memory']) + vcore_ps = int(resources['components']['ps']['yarn.vcores']) + mem_chiefworker = int(resources['components']['chiefworker']['yarn.memory']) + vcore_chiefworker = int(resources['components']['chiefworker']['yarn.vcores']) + mem_worker = int(resources['components']['worker']['yarn.memory']) + vcore_worker = int(resources['components']['worker']['yarn.vcores']) + mem_tb = int(resources['components']['tensorboard']['yarn.memory']) + vcore_tb = int(resources['components']['tensorboard']['yarn.vcores']) + dict = {"mem.ps": mem_ps, "vcore.ps": vcore_ps, + "mem.chiefworker": mem_chiefworker, "vcore_chiefworker": vcore_chiefworker, + "mem.worker": mem_worker, "vcore.worker": vcore_worker, + "mem.tensorboard": mem_tb, "vcore.tensorboard": vcore_tb} + return dict + +def get_allocated_instances_num(): + resources_rest_url = get_am_rest_base() + '/ws/v1/slider/application/live/resources' + resources = json.loads(urllib2.urlopen(resources_rest_url).read()) + n = int(resources['components']['ps']['yarn.component.instances']) + cw = int(resources['components']['chiefworker']['yarn.component.instances']) + w = int(resources['components']['worker']['yarn.component.instances']) + return n, cw + w + +def get_launched_instances(): + try: + exports_rest_url = get_am_rest_base() + '/ws/v1/slider/publisher/exports' + # get launched ps list + exports_ps = json.loads(urllib2.urlopen(exports_rest_url + '/ps').read()) + ps_list = [] + for item in exports_ps['entries']['host_port']: + ps_list.append(item['value']) + # get launched chief worker + exports_chiefworker = json.loads(urllib2.urlopen(exports_rest_url + '/chiefworker').read()) + chiefworker_list = [] + for item in exports_chiefworker['entries']['host_port']: + chiefworker_list.append(item['value']) + # get launched worker list + exports_worker = json.loads(urllib2.urlopen(exports_rest_url + '/worker').read()) + worker_list = [] + for item in exports_worker['entries']['host_port']: + worker_list.append(item['value']) + except: + return ([], []) + else: + return (ps_list, chiefworker_list + worker_list) + +def get_application_id(container_id): + ss = container_id.split('_') + return "application_" + ss[-4] + "_" + ss[-3] + +def is_port_active(host, port, retry=3): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + for i in range(0, 3): + try: + sock.connect((host, port)) + sock.close() + return True + except Exception, e: + sock.close() + return False + + +def get_workers(): + running = [] + finished = [] + comps_url = get_am_rest_base() + format( + "/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components") + comps = json.loads(urllib2.urlopen(comps_url).read()) + for node in comps['nodes']: + comp_url = comps_url + "/" + node + comp = json.loads(urllib2.urlopen(comp_url).read()) + if 'worker' in comp['service']['description']: + if comp['service'].has_key('status') and comp['service']['status'] == 'finished': + finished.append(node) + else: + running.append(node) + return running, finished + + +def set_container_status(containerid, status='finished'): + comp_url = get_am_rest_base() \ + + format("/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \ + + containerid.replace('_', '-') + comp = json.loads(urllib2.urlopen(comp_url).read()) + if not comp['service'].has_key('status') or comp['service']['status'] != 'finished': + comp['service']['status'] = status + zk = KazooClient(hosts=format("{registry_zk}")) + zk.start() + path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \ + + containerid.replace('_', '-') + zk.set(path, json.dumps(comp['service'])) + + +def set_retry_num(containerid): + comp_url = get_am_rest_base() \ + + format("/ws/v1/slider/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \ + + containerid.replace('_', '-') + comp = json.loads(urllib2.urlopen(comp_url).read()) + if not comp['service'].has_key('retry'): + comp['service']['retry'] = 1 + else: + comp['service']['retry'] = int(comp['service']['retry']) + 1 + zk = KazooClient(hosts=format("{registry_zk}")) + zk.start() + path = format("/registry/users/{user_name}/services/org-apache-slider/{service_name}/components/") \ + + containerid.replace('_', '-') + zk.set(path, json.dumps(comp['service'])) + return comp['service']['retry'] + + +def stop_cluster(): + # use restAPI to stop cluster + stop_url = get_am_rest_base() + '/ws/v1/slider/application/action/stop' + # To be compatible with hadoop-2.6.*, make a trick, use GET method to stop + # This will be replaced by POST, https://issues.apache.org/jira/browse/YARN-2031 + # And actionStopGet should be implemented in ApplicationResource.java + res = urllib2.urlopen(stop_url).read() + print res http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/scripts/params.py b/app-packages/tensorflow/package/scripts/params.py new file mode 100644 index 0000000..ed7c6ef --- /dev/null +++ b/app-packages/tensorflow/package/scripts/params.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * + +# server configurations +config = Script.get_config() + +hadoop_conf = config['configurations']['global']['hadoop.conf'] +yarn_cg_root = config['configurations']['global']['yarn.cgroup.root'] + +user_name = config['configurations']['global']['user.name'] +registry_zk = config['configurations']['global']['zookeeper.quorum'] + +user_scripts_entry = config['configurations']['global']['user.scripts.entry'] +user_checkpoint_prefix = config['configurations']['global']['user.checkpoint.prefix'] + +docker_image = config['configurations']['global']['docker.image'] + +app_root = config['configurations']['global']['app_root'] +app_log_dir = config['configurations']['global']['app_log_dir'] +pid_file = config['configurations']['global']['pid_file'] + +container_id = config['configurations']['global']['app_container_id'] + +ps_port = config['configurations']['global']['ps.port'] +chiefworker_port = config['configurations']['global']['chiefworker.port'] +worker_port = config['configurations']['global']['worker.port'] +tensorboard_port = config['configurations']['global']['tensorboard.port'] +ports_dict = {"port.ps": ps_port, + "port.chiefworker": chiefworker_port, + "port.worker": worker_port, + "port.tensorboard": tensorboard_port} + +componentName = config['componentName'] +service_name = config['serviceName'] +hostname = config['hostname'] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/tensorflow.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/scripts/tensorflow.py b/app-packages/tensorflow/package/scripts/tensorflow.py new file mode 100644 index 0000000..d41998b --- /dev/null +++ b/app-packages/tensorflow/package/scripts/tensorflow.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from tensorflow_service import tensorflow_service + + +class Tensorflow(Script): + def install(self, env): + self.install_packages(env) + pass + + def configure(self, env): + import params + env.set_params(params) + + def start(self, env): + import params + env.set_params(params) + tensorflow_service(action='start') + + def stop(self, env): + import params + env.set_params(params) + tensorflow_service(action='stop') + + def status(self, env): + import params + env.set_params(params) + tensorflow_service(action='status') + + +if __name__ == "__main__": + Tensorflow().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/package/scripts/tensorflow_service.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/package/scripts/tensorflow_service.py b/app-packages/tensorflow/package/scripts/tensorflow_service.py new file mode 100644 index 0000000..8fe265d --- /dev/null +++ b/app-packages/tensorflow/package/scripts/tensorflow_service.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os +import time +from resource_management import * + +def tensorflow_service(action='start'): # 'start' or 'stop' or 'status' + import params + import functions + container_id = format("{container_id}") + application_id = functions.get_application_id(container_id) + componentName = format("{componentName}") + + if action == 'start': + checkpoint_dir = format("{user_checkpoint_prefix}/{service_name}/checkpoints") + mem = functions.get_allocated_resources()['mem.' + componentName] + allocated_port = params.ports_dict['port.' + componentName] + # Always launch role tensorboard + if componentName == "tensorboard": + daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \ + "-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \ + "-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \ + "-p {allocated_port}:{allocated_port} --name={container_id} ytensorflow:0.2.1 " \ + "/bin/bash -c 'tensorboard --logdir={checkpoint_dir} --port={allocated_port}'") + Execute(daemon_cmd) + else: + # Waiting for all ps/worker to be exported + num_ps, num_worker = functions.get_allocated_instances_num() + num_allocated = num_ps + num_worker + ps_list, worker_list = functions.get_launched_instances() + num_launched = len(ps_list) + len(worker_list) + while num_launched < num_allocated: + print format("Waiting for all ports({num_launched}/{num_allocated}) to be exported") + time.sleep(5) + ps_list, worker_list = functions.get_launched_instances() + num_launched = len(ps_list) + len(worker_list) + # Generate parameters + ps_hosts = ",".join(ps_list) + worker_hosts = ",".join(worker_list) + task_index = (ps_list.index(format("{hostname}:{allocated_port}"))) if (componentName == 'ps') else ( + worker_list.index(format("{hostname}:{allocated_port}"))) + job_name = "worker" if (componentName == 'chiefworker') else componentName + # Build clusterSpec and command + daemon_cmd = format("/usr/bin/docker run -d -u $(id -u yarn) --cgroup-parent={yarn_cg_root}/{container_id} -m {mem}m " \ + "-v {hadoop_conf}:/usr/local/hadoop/etc/hadoop " \ + "-v /etc/passwd:/etc/passwd -v /etc/group:/etc/group " \ + "-v {app_root}:{app_root} -v {app_log_dir}:{app_log_dir} " \ + "-p {allocated_port}:{allocated_port} --name={container_id} {docker_image} " \ + "/bin/bash -c 'export HADOOP_USER_NAME={user_name}; /usr/bin/python {app_root}/{user_scripts_entry} " \ + "--ps_hosts={ps_hosts} --worker_hosts={worker_hosts} --job_name={job_name} --task_index={task_index} " \ + "--ckp_dir={checkpoint_dir} --work_dir={app_root} >>{app_log_dir}/tensorflow.out 2>>{app_log_dir}/tensorflow.err'") + Execute(daemon_cmd) + elif action == 'stop': + cmd = format("/usr/bin/docker stop {container_id}") + op_test = format("/usr/bin/docker ps | grep {container_id} >/dev/null 2>&1") + Execute(cmd, + tries=5, + try_sleep=10, + wait_for_finish=True, + only_if=op_test + ) + elif action == 'status': + cmd_status = "/usr/bin/docker inspect -f '{{.State.Running}}' %s" % container_id + running = os.popen(cmd_status).read().strip('\n') + if running == 'true': + print "Component instance is running..." + # Role tensorboard will watch all workers' status + if componentName == "tensorboard": + running, finished = functions.get_workers() + print "Running tensorflow workers(%s) : %s \nFinished tensorflow workers(%s) : %s" \ + % (len(running), ','.join(running), len(finished), ','.join(finished)) + # All worker has finished successfully, going to stop cluster... + num_ps, num_worker = functions.get_allocated_instances_num() + if len(finished) == num_worker: + functions.stop_cluster() + else: + cmd_exit = "/usr/bin/docker inspect -f '{{.State.ExitCode}}' %s" % container_id + exit_code = int(os.popen(cmd_exit).read().strip('\n')) + if exit_code != 0: + retry = functions.set_retry_num(container_id) + if retry <= 5: + # Remove failed docker container + cmd_rm = format("/usr/bin/docker rm -f {container_id}") + Execute(cmd_rm) + # restart user tensorflow script + tensorflow_service(action='start') + else: + raise ComponentIsNotRunning() + else: + print "Component instance has finished successfully" + functions.set_container_status(container_id) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/resources.default.json ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/resources.default.json b/app-packages/tensorflow/resources.default.json new file mode 100644 index 0000000..b0ef16a --- /dev/null +++ b/app-packages/tensorflow/resources.default.json @@ -0,0 +1,38 @@ +{ + "schema" : "http://example.org/specification/v2.0.0", + "metadata" : { + }, + "global" : { + }, + "components": { + "slider-appmaster": { + }, + "ps": { + "yarn.role.priority": "1", + "yarn.component.instances": "2", + "yarn.memory": "2048", + "yarn.vcores": "1", + "yarn.container.failure.threshold": "0" + }, + "chiefworker": { + "yarn.role.priority": "2", + "yarn.component.instances": "1", + "yarn.memory": "4096", + "yarn.vcores": "1", + "yarn.container.failure.threshold": "0" + }, + "worker": { + "yarn.role.priority": "3", + "yarn.component.instances": "5", + "yarn.memory": "2048", + "yarn.vcores": "1", + "yarn.container.failure.threshold": "0" + }, + "tensorboard": { + "yarn.role.priority": "4", + "yarn.component.instances": "1", + "yarn.memory": "4096", + "yarn.vcores": "1" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/config.default.json ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/ytensorflow/config.default.json b/app-packages/tensorflow/ytensorflow/config.default.json new file mode 100644 index 0000000..eff4368 --- /dev/null +++ b/app-packages/tensorflow/ytensorflow/config.default.json @@ -0,0 +1,29 @@ +{ + "schema" : "http://example.org/specification/v2.0.0", + "commandConfig": { + "app.name": "tfdocker1" + }, + "appConfig" : { + "global": { + "user.scripts.entry": "ymnist.py" + } + }, + "resources": { + "components": { + "ps": { + "yarn.component.instances": "2", + "yarn.memory": "2048", + "yarn.vcores": "1" + }, + "chiefworker": { + "yarn.memory": "4096", + "yarn.vcores": "1" + }, + "worker": { + "yarn.component.instances": "4", + "yarn.memory": "2048", + "yarn.vcores": "1" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/ytensorflow ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/ytensorflow/ytensorflow b/app-packages/tensorflow/ytensorflow/ytensorflow new file mode 100644 index 0000000..2e3c689 --- /dev/null +++ b/app-packages/tensorflow/ytensorflow/ytensorflow @@ -0,0 +1,65 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# this is the wrapper shell script to invoke the ytensorflow.py script in the same folder + +bin=`S=\`readlink "$0"\`; [ -z "$S" ] && S=$0; dirname $S` +bin=`cd "$bin"; pwd` + +function print_usage(){ + echo "Usage: ytensorflow COMMAND" + echo " where COMMAND is one of:" + echo " cluster cluster management" + echo " version print the version" + echo "" + echo "Most commands print help when invoked w/o parameters." +} + +if [ $# = 0 ]; then + print_usage + exit +fi + +COMMAND=$1 +case $COMMAND in + # usage flags + --help|-help|-h) + print_usage + exit + ;; + + # tensorflow cluster management + cluster) + [ ! -d "$HOME/.slider" ] && { mkdir "$HOME/.slider"; } + path_tmp="$HOME/.slider/tensorflow.$(date +'%s')" + cp -r "$bin/.." $path_tmp + /usr/bin/python $bin/ytensorflow.py $@ $path_tmp + [ $? -eq 0 ] && rm -rf $path_tmp + exit + ;; + + version) + /usr/bin/python $bin/ytensorflow.py $@ + exit + ;; + + *) + echo "Error: No command named $COMMAND was found." + exit + ;; +esac \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5bf14692/app-packages/tensorflow/ytensorflow/ytensorflow.py ---------------------------------------------------------------------- diff --git a/app-packages/tensorflow/ytensorflow/ytensorflow.py b/app-packages/tensorflow/ytensorflow/ytensorflow.py new file mode 100644 index 0000000..c0597c4 --- /dev/null +++ b/app-packages/tensorflow/ytensorflow/ytensorflow.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os +import sys +import json + + +def main(): + """ + ytensorflow main method + :return: exit code of the process + """ + returncode = 1 + slider_home = sys.path[0] + '/../../..' + slider = slider_home + '/bin/slider' if os.path.exists(slider_home + '/bin/slider') else 'slider' + args = sys.argv[1:] + if args[0] == 'version': + print get_version() + returncode = 0 + elif args[0] == 'cluster': + if args[1] == '-start': + app_name = parse_conf(args[2]) + if 4 < len(args): + cp_files(args[args.index('-files') + 1]) + cmd = "%s status %s || %s destroy %s --force && %s create %s --appdef %s --resources %s --template %s" \ + %(slider, app_name, + slider, app_name, + slider, app_name, sys.argv[-1], sys.argv[-1] + '/resources.json', sys.argv[-1] + '/appConfig.json') + if args[1] == '-stop': + app_name = args[2] + cmd = slider + " stop " + app_name + if args[1] == '-status': + app_name = args[2] + cmd = slider + " status " + app_name + print cmd + returncode = os.system(cmd) + return returncode + +def get_version(): + root_path = sys.path[0] + '/..' + with open(root_path + "/metainfo.json", 'r') as f: + metainfo = json.load(f) + name = metainfo['application']['name'] + version = metainfo['application']['version'] + return name + " on Slider " + version + +def cp_files(files): + os.system('cp -r ' + files + ' ' + sys.argv[-1] + '/package/files/') + +def parse_conf(config_file): + root_path = sys.path[0] + '/..' + tmp_path = sys.argv[-1] + app_name = 'default_app_name' + with open(config_file, 'r') as f: + data = json.load(f) + for k in data: + if k == 'commandConfig': + app_name = data[k]['app.name'] + if k == 'appConfig': + # override appconfig.default.json + with open(root_path + '/appConfig.default.json', 'r') as f: + data_app = json.load(f) + for kk,vv in data['appConfig']['global'].items(): + data_app['global']["site.global." + kk] = vv + with open(tmp_path + '/appConfig.json', 'w') as f: + json.dump(data_app, f) + if k == 'resources': + # override resources.default.json + with open(root_path + '/resources.default.json', 'r') as f: + data_res = json.load(f) + for kk,vv in data['resources']['components']['ps'].items(): + data_res['components']['ps'][kk] = vv + for kk,vv in data['resources']['components']['chiefworker'].items(): + data_res['components']['chiefworker'][kk] = vv + for kk,vv in data['resources']['components']['worker'].items(): + data_res['components']['worker'][kk] = vv + with open(tmp_path + '/resources.json', 'w') as f: + json.dump(data_res, f) + return app_name + +if __name__ == '__main__': + """ + Entry point + """ + try: + returncode = main() + except Exception as e: + print "Exception: %s " % str(e) + returncode = -1 + sys.exit(returncode)