Repository: ambari Updated Branches: refs/heads/branch-2.5 3c3d3d23d -> 4c2c50aa4
AMBARI-18740. Perf: Automate deployment of PERF stack on 2500+ Ambari Agents.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4c2c50aa Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4c2c50aa Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4c2c50aa Branch: refs/heads/branch-2.5 Commit: 4c2c50aa4da0a683f52117fe606de4f4200adbbc Parents: 3c3d3d2 Author: Vitaly Brodetskyi <[email protected]> Authored: Wed Nov 16 09:30:06 2016 +0200 Committer: Vitaly Brodetskyi <[email protected]> Committed: Wed Nov 16 09:30:06 2016 +0200 ---------------------------------------------------------------------- contrib/utils/perf/deploy-gce-perf-cluster.py | 360 +++++++++++++++++++++ 1 file changed, 360 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4c2c50aa/contrib/utils/perf/deploy-gce-perf-cluster.py ---------------------------------------------------------------------- diff --git a/contrib/utils/perf/deploy-gce-perf-cluster.py b/contrib/utils/perf/deploy-gce-perf-cluster.py new file mode 100644 index 0000000..da45f8f --- /dev/null +++ b/contrib/utils/perf/deploy-gce-perf-cluster.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import argparse +import os +import subprocess +import sys +import pprint +import time +import traceback +import re +import socket + +cluster_name="perf-cluster" +ambari_repo_file_url="http://s3.amazonaws.com/dev.hortonworks.com/ambari/centos6/2.x/latest/trunk/ambaribn.repo" + +public_hostname_script="foo" +hostname_script="foo" + +start_number=1 +number_of_agents_on_host=50 + + +class SSH: + """ Ssh implementation of this """ + def __init__(self, user, sshkey_file, host, command, custom_option='', errorMessage = None): + self.user = user + self.sshkey_file = sshkey_file + self.host = host + self.command = command + self.errorMessage = errorMessage + self.custom_option = custom_option + pass + + + def run(self): + sshcommand = ["ssh", + "-o", "ConnectTimeOut=180", + "-o", "StrictHostKeyChecking=no", + "-o", "BatchMode=yes", + self.custom_option, + "-i", self.sshkey_file, + self.user + "@" + self.host, self.command] + + if not self.custom_option: + del sshcommand[7] + + i = 1 + while True: + try: + sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + log = sshstat.communicate() + if sshstat.returncode != 0: + print "Executing SSH command on {0} failed: {1}".format(self.host, log) + print "\nRetrying SSH command one more time!" + if i >= 3: + break + i += 1 + time.sleep(10) + continue + break + except: + print "Could not SSH to {0}, waiting for it to start".format(self.host) + i += 1 + time.sleep(10) + + if i >= 3: + print "Could not execute remote ssh command: " + ' '.join(sshcommand) + raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log)) + + errorMsg = log[1] + if self.errorMessage and sshstat.returncode != 0: + errorMsg = self.errorMessage + "\n" + errorMsg + + print "SSH command execution finished" + + return {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg} + + +class SCP: + """ SCP implementation that is thread based. The status can be returned using + status val """ + def __init__(self, user, sshkey_file, host, inputFile, remote, errorMessage = None): + self.user = user + self.sshkey_file = sshkey_file + self.host = host + self.inputFile = inputFile + self.remote = remote + self.errorMessage = errorMessage + pass + + + def run(self): + scpcommand = ["scp", + "-r", + "-o", "ConnectTimeout=60", + "-o", "BatchMode=yes", + "-o", "StrictHostKeyChecking=no", + "-i", self.sshkey_file, self.inputFile, self.user + "@" + + self.host + ":" + self.remote] + + i = 1 + while True: + try: + scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + log = scpstat.communicate() + if scpstat.returncode != 0: + print "Executing SCP command on {0} failed: {1}".format(self.host, log) + print "\nRetrying SCP command one more time!" + if i >= 3: + break + i += 1 + time.sleep(10) + continue + break + except: + print "Could not SCP to {0}, waiting for it to start".format(self.host) + i += 1 + time.sleep(10) + + if i >= 3: + print "Could not execute remote scp command: " + ' '.join(scpcommand) + raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log)) + + errorMsg = log[1] + if self.errorMessage and scpstat.returncode != 0: + errorMsg = self.errorMessage + "\n" + errorMsg + + print "SCP command execution finished" + + return {"exitstatus": scpstat.returncode, "log": log, "errormsg": errorMsg} + +# main method to parse arguments from user and start work +def main(): + parser = argparse.ArgumentParser( + description='This script brings up a cluster with ambari installed, configured and started', + epilog='Only GCE is supported as of now!' + ) + + # options + parser.add_argument('--controller', type=str, + action='store', help='GCE controller ip address.') + + parser.add_argument('--key', type=str, + action='store', help='Path to GCE ssh key.') + + parser.add_argument('--cluster-prefix', type=str, + action='store', help='Cluster name prefix.') + + parser.add_argument('--agent-prefix', type=str, + action='store', help='Agent name prefix.') + + parser.add_argument('--agents-count', type=int, + action='store', help='Agents count for whole cluster(multiples of 50).') + + if len(sys.argv) <= 1: + parser.print_help() + else: + args = parser.parse_args() + do_work(args) + +# base method which process cluster deployment +def deploy_cluster(args): + number_of_nodes=args.agents_count/number_of_agents_on_host + # trying to create cluster with needed params + print "Creating cluster {0}-{1} with {2} large nodes on centos6...".format(args.cluster_prefix, cluster_name, str(number_of_nodes)) + execute_command(args, args.controller, "/usr/sbin/gce up {0}-{1} {2} --centos6 --large".format(args.cluster_prefix, cluster_name, str(number_of_nodes)), + "Failed to create cluster, probably not enough resources!", "-tt") + + # VMs are not accessible immediately + time.sleep(10) + + # getting list of vms information like hostname and ip address + print "Getting list of virtual machines from cluster..." + vms = get_vms_list(args) + + # check number of nodes in cluster to be the same as user asked + print "Checking count of created nodes in cluster..." + if not vms or len(vms) < number_of_nodes: + raise Exception("Can not bring up enough nodes. Requested {0}, but got: {1}. Probably not enough resources!".format(number_of_nodes, len(vms))) + + print "GCE cluster was successfully created!" + pretty_print_vms(vms) + + # installing/starting ambari-server and ambari-agents on each host + server_host_name = sorted(vms.items())[0][0] + server_installed=False + + print "Creating server.sh script (which will be executed on server to install/configure/start ambari-server and ambari-agent)..." + create_server_script(args, server_host_name) + + print "Creating agent.sh script (which will be executed on agent hosts to install/configure/start ambari-agent..." + create_agent_script(args, server_host_name) + + time.sleep(10) + for (hostname, ip) in sorted(vms.items()): + print "==========================" + print "Working on {0}".format(hostname) + if not server_installed: + remote_path = "/server.sh" + local_path = "server.sh" + print "Copying server.sh to {0}...".format(hostname) + put_file(args, ip, local_path, remote_path, "Failed to copy file!") + print "Executing remote ssh command (set correct permissions and start executing server.sh in separate process) on {0}...".format(hostname) + execute_command(args, ip, "cd /; chmod 777 server.sh; nohup ./server.sh >/server.log 2>&1 &", + "Install/configure/start server script failed!") + server_installed = True + else: + remote_path = "/agent.sh" + local_path = "agent.sh" + print "Copying agent.sh to {0}...".format(hostname) + put_file(args, ip, local_path, remote_path, "Failed to copy file!") + print "Executing remote ssh command (set correct permissions and start executing agent.sh in separate process) on {0}...".format(hostname) + execute_command(args, ip, "cd /; chmod 777 agent.sh; nohup ./agent.sh >/agent.log 2>&1 &", + "Install/configure start agent script failed!") + + print "All scripts where successfully copied and started on all hosts. " \ + "\nPay attention that server.sh script need 5 minutes to finish and agent.sh need 3 minutes!" + +# check if all required params were passed by user +# if all needed params available then start cluster deploy +def do_work(args): + if not args.controller: + raise Failure("GCE controller ip address not defined!") + + if not args.key: + raise Failure("Path to gce ssh key not defined!") + + if not args.cluster_prefix: + raise Failure("Cluster name prefix not defined!") + + if not args.agent_prefix: + raise Failure("Agent name prefix not defined!") + + if not args.agents_count: + raise Failure("Agents count for whole cluster(multiples of 50) not defined!") + + deploy_cluster(args) + +# creating server.sh script in the same dir where current script is located +# server.sh script will install, configure and start ambari-server and ambari-agent on host +def create_server_script(args, server_host_name): + file = open("server.sh", "w") + + file.write("#!/bin/bash\n") + + file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url)) + file.write("yum clean all; yum install git ambari-server ambari-agent -y\n") + file.write("cd /home; git clone https://github.com/apache/ambari.git\n") + + file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-server/resources/stacks/PERF\n") + file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n") + + file.write("ambari-server setup -s\n") + file.write("sed -i -e 's/false/true/g' /var/lib/ambari-server/resources/stacks/PERF/1.0/metainfo.xml\n") + file.write("ambari-server start --skip-database-check\n") + + file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name)) + file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script)) + file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix)) + file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n") + + file.write("exit 0") + + file.close() + +# creating agent.sh script in the same dir where current script is located +# agent.sh script will install, configure and start ambari-agent on host +def create_agent_script(args, server_host_name): + file = open("agent.sh", "w") + + file.write("#!/bin/bash\n") + + file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url)) + file.write("yum clean all; yum install git ambari-agent -y\n") + file.write("cd /home; git clone https://github.com/apache/ambari.git\n") + file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n") + + file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name)) + file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script)) + file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix)) + file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n") + file.write("exit 0") + + file.close() + +# method to execute ssh commands via SSH class +def execute_command(args, ip, cmd, fail_message, custom_option='', login='root'): + ssh = SSH(login, args.key, ip, cmd, custom_option, fail_message) + ssh_result = ssh.run() + status_code = ssh_result["exitstatus"] + if status_code != 0: + raise Exception(ssh_result["errormsg"]) + + return ssh_result["log"][0] + +# method to copy file from local to remote host via SCP class +def put_file(args, ip, local_file, remote_file, fail_message, login='root'): + scp = SCP(login, args.key, ip, local_file, + remote_file, fail_message) + scp_result = scp.run() + status_code = scp_result["exitstatus"] + if status_code != 0: + raise Exception(scp_result["errormsg"]) + + return scp_result["log"][0] + +# method to parse "gce fqdn {cluster-name}" command output and get hosts and ips pairs for every host in cluster +def get_vms_list(args): + gce_fqdb_cmd = '/usr/sbin/gce fqdn {0}-{1}'.format( + args.cluster_prefix, cluster_name) + out = execute_command(args, args.controller, gce_fqdb_cmd, "Failed to get VMs list!", "-tt") + lines = out.split('\n') + #print "LINES=" + str(lines) + if lines[0].startswith("Using profile") and not lines[1].strip(): + result = {} + for s in lines[2:]: # Ignore non-meaningful lines + if not s: + continue + match = re.match(r'^([\d\.]*)\s+([\w\.-]*)\s+([\w\.-]*)\s$', s, re.M) + if match: + result[match.group(2)] = match.group(1) + else: + raise Exception('Can not parse "{0}"'.format(s)) + return result + else: + raise Exception('Can not parse "{0}"'.format(lines)) + + +def pretty_print_vms(vms): + print "----------------------------" + print "server ip: {0}".format(sorted(vms.items())[0][1]) + print "Hostnames of nodes in cluster:" + for (hostname, ip) in sorted(vms.items()): + print hostname + print "----------------------------" + + +if __name__ == "__main__": + main() + +
