Updated Branches: refs/heads/master 3f176033d -> cac2d3995
Added 'mesos-scp' CLI command. The CLI command mesos-scp uses scp to copy the specified local file(s) to a specified directory on all slaves known by the current master. From: Du Li <lidu...@gmail.com> Review: https://reviews.apache.org/r/14963 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cac2d399 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cac2d399 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cac2d399 Branch: refs/heads/master Commit: cac2d39958afe95aeb465a939bb19c3ea6b1ac1a Parents: 3f17603 Author: Benjamin Hindman <benjamin.hind...@gmail.com> Authored: Wed Dec 4 12:16:47 2013 -0800 Committer: Benjamin Hindman <benjamin.hind...@gmail.com> Committed: Thu Dec 5 14:29:39 2013 -0800 ---------------------------------------------------------------------- src/Makefile.am | 1 + src/cli/mesos-cat | 36 +++++-------- src/cli/mesos-ps | 24 ++++----- src/cli/mesos-scp | 109 +++++++++++++++++++++++++++++++++++++++ src/cli/mesos-tail | 34 +++++------- src/cli/python/mesos/cli.py | 7 +++ 6 files changed, 153 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 42dafbc..5f211a2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -405,6 +405,7 @@ mesos_resolve_LDADD = libmesos.la dist_bin_SCRIPTS += \ cli/mesos-cat \ cli/mesos-ps \ + cli/mesos-scp \ cli/mesos-tail # Also install the supporting scripts for the Python based CLI tools. http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-cat ---------------------------------------------------------------------- diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat index f898e82..73dc63e 100755 --- a/src/cli/mesos-cat +++ b/src/cli/mesos-cat @@ -6,9 +6,8 @@ import signal import sys import urllib2 -from contextlib import closing from optparse import OptionParser -from urllib2 import HTTPError, urlopen +from urllib2 import HTTPError from mesos import http from mesos.cli import * @@ -16,8 +15,7 @@ from mesos.futures import * if sys.version_info < (2,6,0): - sys.stderr.write('Expecting Python >= 2.6\n') - sys.exit(1) + fatal('Expecting Python >= 2.6') def read(slave, task, file): @@ -32,8 +30,7 @@ def read(slave, task, file): try: state = json.loads(http.get(slave['pid'], '/state.json')) except: - sys.stderr.write('Failed to get state from slave\n') - sys.exit(1) + fatal('Failed to get state from slave') directory = None @@ -56,8 +53,7 @@ def read(slave, task, file): break if directory is None: - sys.stderr.write('File not found\n') - sys.exit(1) + fatal('File not found') path = os.path.join(directory, file) @@ -70,10 +66,9 @@ def read(slave, task, file): 'offset': -1})) except HTTPError as error: if error.code == 404: - sys.stderr.write('No such file or directory\n') + fatal('No such file or directory') else: - sys.stderr.write('Failed to determine length of file\n') - sys.exit(1) + fatal('Failed to determine length of file') length = result['offset'] @@ -94,8 +89,7 @@ def read(slave, task, file): if offset == length: return except: - sys.stderr.write('Failed to read file from slave\n') - sys.exit(1) + fatal('Failed to read file from slave') def main(): @@ -124,8 +118,7 @@ def main(): state = json.loads(http.get(resolve(options.master), '/master/state.json')) except: - sys.stderr.write('Failed to get the master state\n') - sys.exit(1) + fatal('Failed to get the master state') # Build a dict from slave ID to slaves. slaves = {} @@ -155,15 +148,14 @@ def main(): cat(slaves[task['slave_id']], task) sys.exit(0) - sys.stderr.write('No task found!\n') - sys.exit(-1) + fatal('No task found!') if __name__ == '__main__': - def signal_handler(signal, frame): - sys.stdout.write('\n') - sys.exit(130) + def signal_handler(signal, frame): + sys.stdout.write('\n') + sys.exit(130) - signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGINT, signal_handler) - main() + main() http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-ps ---------------------------------------------------------------------- diff --git a/src/cli/mesos-ps b/src/cli/mesos-ps index 83cde13..ddd9ec5 100755 --- a/src/cli/mesos-ps +++ b/src/cli/mesos-ps @@ -5,9 +5,7 @@ import json import signal import sys -from contextlib import closing from optparse import OptionParser -from urllib2 import urlopen from mesos import http from mesos.cli import * @@ -15,8 +13,7 @@ from mesos.futures import * if sys.version_info < (2,6,0): - sys.stderr.write('Expecting Python >= 2.6\n') - sys.exit(1) + fatal('Expecting Python >= 2.6') USER_COLUMN_PADDING = 4 @@ -158,16 +155,14 @@ def main(): try: timeout = float(options.timeout) except: - sys.stderr.write('Expecting --timeout to be a floating point number\n') - sys.exit(-1) + fatal('Expecting --timeout to be a floating point number') # Get the master's state. try: state = json.loads(http.get(resolve(options.master), '/master/state.json')) except: - sys.stderr.write('Failed to get the master state\n') - sys.exit(1) + fatal('Failed to get the master state') # Collect all the active frameworks and tasks by slave ID. active = {} @@ -208,8 +203,7 @@ def main(): try: statistics = json.loads(future.result()) except TimeoutError: - sys.stderr.write('Timed out while waiting for slaves\n') - sys.exit(1) + fatal('Timed out while waiting for slaves') except Exception as e: # TODO(benh): Print error if 'verbose'. pass @@ -229,10 +223,10 @@ def main(): if __name__ == '__main__': - def handler(signal, frame): - sys.stdout.write('\n') - sys.exit(130) + def handler(signal, frame): + sys.stdout.write('\n') + sys.exit(130) - signal.signal(signal.SIGINT, handler) + signal.signal(signal.SIGINT, handler) - main() + main() http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-scp ---------------------------------------------------------------------- diff --git a/src/cli/mesos-scp b/src/cli/mesos-scp new file mode 100755 index 0000000..77b8557 --- /dev/null +++ b/src/cli/mesos-scp @@ -0,0 +1,109 @@ +#!/usr/bin/env python + +# Uses 'scp' to copy local files to all slaves reported by the master. + +import json +import signal +import subprocess +import sys + +from optparse import OptionParser + +from mesos import http +from mesos.cli import * +from mesos.futures import * + + +if sys.version_info < (2,6,0): + fatal('Expecting Python >= 2.6') + + +def scp(host, src, dst): + cmd = 'scp -pr %s %s' % (src, host + ':' + dst) + try: + process = subprocess.Popen( + cmd, + stdin=None, + stdout=None, + stderr=None, + shell=True) + return process.wait() == 0 + except Exception as e: + sys.stderr.write('Exception %s when doing %s\n' % (e, cmd)) + return False + + +def main(): + # Parse options for this script. + parser = OptionParser() + parser.add_option('--master') + parser.usage = '%prog [options] local-file(s) remote-directory' + parser.epilog = ('This command uploads the specifeid local file(s) ' + 'to a remote directory on all slaves known by the ' + 'master. The current implementation assumes ' + 'passwordless scp') + (options, args) = parser.parse_args(sys.argv) + + if options.master is None: + usage('Missing --master', parser) + + # Get the master's state. + try: + state = json.loads(http.get(resolve(options.master), + '/master/state.json')) + except: + fatal('Failed to get the master state') + + # all slaves that the master is aware of + slaves = set(slave['hostname'] for slave in state['slaves']) + + if len(args) < 3: + usage('Missing arguments', parser) + + # All arguments after args[0] until the last argument are the + # local files. + src = " ".join(args[1:-1]) + + # Remote directory is the very last argument. + dst = args[-1] + + success = set() + with ThreadingExecutor() as executor: + futures = dict((executor.submit(scp, slave, src, dst), slave) + for slave in slaves) + for future in as_completed(futures): + slave = futures[future] + try: + status = future.result() + if status: + success.add(slave) + except Exception as e: + sys.stderr.write('Failed to copy to %s: %s\n' % (slave, e)) + + print + + for slave in success: + print '%s\t%s' % (slave, 'uploaded') + + print + + failed = slaves - success + for slave in failed: + print '%s\t%s' % (slave, 'failed') + + print + + print ('----- %d uploaded, %d failed of total %d slaves' + % (len(success), len(failed), len(slaves))) + + print + + +if __name__ == '__main__': + def handler(signal, frame): + sys.stdout.write('\n') + sys.exit(130) + + signal.signal(signal.SIGINT, handler) + + main() http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-tail ---------------------------------------------------------------------- diff --git a/src/cli/mesos-tail b/src/cli/mesos-tail index e2aeaac..256a804 100755 --- a/src/cli/mesos-tail +++ b/src/cli/mesos-tail @@ -7,9 +7,8 @@ import sys import time import itertools -from contextlib import closing from optparse import OptionParser -from urllib2 import HTTPError, urlopen +from urllib2 import HTTPError from mesos import http from mesos.cli import * @@ -17,8 +16,7 @@ from mesos.futures import * if sys.version_info < (2,6,0): - sys.stderr.write('Expecting Python >= 2.6\n') - sys.exit(1) + fatal('Expecting Python >= 2.6') def read_forever(slave, task, file): framework_id = task['framework_id'] @@ -32,8 +30,7 @@ def read_forever(slave, task, file): try: state = json.loads(http.get(slave['pid'], '/state.json')) except: - sys.stderr.write('Failed to get state from slave\n') - sys.exit(1) + fatal('Failed to get state from slave') directory = None @@ -47,8 +44,7 @@ def read_forever(slave, task, file): break if directory is None: - sys.stderr.write('Task directory not found\n') - sys.exit(1) + fatal('Task directory not found') path = os.path.join(directory, file) @@ -65,10 +61,9 @@ def read_forever(slave, task, file): 'length': PAGE_LENGTH})) except HTTPError as error: if error.code == 404: - sys.stderr.write('No such file or directory\n') + fatal('No such file or directory') else: - sys.stderr.write('Failed to read file from slave\n') - sys.exit(1) + fatal('Failed to read file from slave') if len(result['data']) == 0: time.sleep(0.5) continue @@ -102,8 +97,7 @@ def main(): master_state = json.loads(http.get(resolve(options.master), '/master/state.json')) except: - sys.stderr.write('Failed to get the master state\n') - sys.exit(1) + fatal('Failed to get the master state') # Build a dict from slave ID to `slaves'. slaves = {} @@ -123,16 +117,14 @@ def main(): tail(slaves[task['slave_id']], task, options.file) sys.exit(0) - sys.stderr.write('No task or framework found!\n') - sys.stderr.flush() - sys.exit(-1) + fatal('No task or framework found!') if __name__ == '__main__': - def signal_handler(signal, frame): - sys.stdout.write('\n') - sys.exit(130) + def signal_handler(signal, frame): + sys.stdout.write('\n') + sys.exit(130) - signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGINT, signal_handler) - main() + main() http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/python/mesos/cli.py ---------------------------------------------------------------------- diff --git a/src/cli/python/mesos/cli.py b/src/cli/python/mesos/cli.py index 5c11d46..f32ba49 100644 --- a/src/cli/python/mesos/cli.py +++ b/src/cli/python/mesos/cli.py @@ -6,6 +6,13 @@ def usage(message, parser): sys.exit(-1) +# Helper for printing out a message and then exiting. +def fatal(message): + import sys + sys.stderr.write(message + '\n') + sys.exit(-1) + + # Helper that uses 'mesos-resolve' to resolve a master IP:port from # one of: # zk://host1:port1,host2:port2,.../path