This is an automated email from the ASF dual-hosted git repository.
amagyar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 726d1d0 AMBARI-24708. Automation script for upgrade old style isilon
cluster to the new mpack based structure (amagyar) (#2397)
726d1d0 is described below
commit 726d1d016ec8eae1056a1412f8b857927dfc32ad
Author: Attila Magyar <[email protected]>
AuthorDate: Mon Oct 1 13:40:54 2018 +0200
AMBARI-24708. Automation script for upgrade old style isilon cluster to the
new mpack based structure (amagyar) (#2397)
* AMBARI-24708. Automation script for upgrade old style isilon cluster to
the new mpack based structure (amagyar)
* AMBARI-24708. Automation script for upgrade old style isilon cluster to
the new mpack based structure (amagyar)
---
.../src/main/tools/hdfs_to_onefs_convert.py | 531 +++++++++++++++++++++
1 file changed, 531 insertions(+)
diff --git
a/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
b/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
new file mode 100644
index 0000000..4c63c9f
--- /dev/null
+++
b/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
@@ -0,0 +1,531 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import urllib2, base64, json, ssl, time, random, sys
+from optparse import OptionParser
+from contextlib import closing
+
+class SslContext:
+ def build(self, url):
+ if not url.startswith('https') or not hasattr(ssl, 'SSLContext'):
+ return None
+ return ssl.SSLContext(self._protocol()) if self._protocol() else
ssl.create_default_context()
+
+ def _protocol(self):
+ if hasattr(ssl, 'PROTOCOL_TLS'): return ssl.PROTOCOL_TLS
+ elif hasattr(ssl, 'PROTOCOL_TLSv1_2'): return ssl.PROTOCOL_TLSv1_2
+ elif hasattr(ssl, 'PROTOCOL_TLSv1_1'): return ssl.PROTOCOL_TLSv1_1
+ elif hasattr(ssl, 'PROTOCOL_TLSv1'): return ssl.PROTOCOL_TLSv1
+ else: return None
+
+class PermissiveSslContext:
+ def build(self, url):
+ context = SslContext().build(url)
+ if hasattr(context, '_https_verify_certificates'):
+ context._https_verify_certificates(False)
+ return context
+
+class Url:
+ @classmethod
+ def base(clazz, protocol, host, port):
+ return clazz('%s://%s:%d' % (protocol, host, port))
+
+ def __init__(self, url_str):
+ self.base = url_str.rstrip('/')
+
+ def __div__(self, suffix_url):
+ suffix_str = str(suffix_url)
+ if self._is_absolute(suffix_str):
+ return Url(suffix_str)
+ else:
+ return Url(self.base + (suffix_str if suffix_str.startswith('/') else
'/' + suffix_str))
+
+ def _is_absolute(self, suffix_str):
+ return suffix_str.startswith(self.base)
+
+ def query_params(self, a_dict):
+ return Url(self.base + '?' + '&'.join('%s=%s' % (name, value) for name,
value in a_dict.items()))
+
+ def __str__(self):
+ return self.base
+
+class Header:
+ @classmethod
+ def csrf(clazz):
+ return clazz('X-Requested-By', 'ambari')
+
+ def __init__(self, key, value):
+ self.key, self.value = key, value
+
+ def add_to(self, request):
+ request.add_header(self.key, self.value)
+
+class BasicAuth:
+ def __init__(self, user, password):
+ self.header = Header(
+ 'Authorization',
+ 'Basic %s' % base64.encodestring('%s:%s' % (user,
password)).replace('\n', ''))
+
+ def authenticate(self, request):
+ self.header.add_to(request)
+
+class ResponseTransformer:
+ @staticmethod
+ def identity():
+ return lambda url, code, data: (code, data)
+
+ def __call__(self, url, code, data):
+ raise RuntimeError('Subclass responsibility')
+
+class UnexpectedHttpCode(Exception): pass
+
+class JsonTransformer(ResponseTransformer):
+ def __call__(self, url, code, data):
+ if 200 <= code <= 299:
+ return code, self._parse(data)
+ else:
+ return UnexpectedHttpCode('Unexpected http code: %d url: %s response:
%s' % (code, url, data))
+
+ def _parse(self, a_str):
+ if not a_str:
+ return {}
+ try:
+ return json.loads(a_str)
+ except ValueError as e:
+ raise ValueError('Error %s while parsing: %s' % (e, a_str))
+
+class RestClient:
+ def __init__(self, an_url, authenticator, headers=[],
ssl_context=SslContext(), request_transformer=lambda r:r,
response_transformer=ResponseTransformer.identity()):
+ self.base_url = an_url
+ self.authenticator = authenticator
+ self.headers = headers
+ self.ssl_context = ssl_context
+ self.request_transformer = request_transformer
+ self.response_transformer = response_transformer
+
+ def get(self, suffix_str):
+ return self._response(*self._request(suffix_str, 'GET'))
+
+ def post(self, suffix_str, data):
+ return self._response(*self._request(suffix_str, 'POST', data=data))
+
+ def put(self, suffix_str, data):
+ return self._response(*self._request(suffix_str, 'PUT', data=data))
+
+ def delete(self, suffix_str):
+ return self._response(*self._request(suffix_str, 'DELETE'))
+
+ def _request(self, suffix_str, http_method, data=""):
+ url = str(self.base_url / suffix_str)
+ request = urllib2.Request(url, data=self.request_transformer(data))
+ request.get_method = lambda: http_method
+ self.authenticator.authenticate(request)
+ map(lambda each: each.add_to(request), self.headers)
+ return request, self.ssl_context.build(url)
+
+ def _response(self, request, ssl_context):
+ with closing(urllib2.urlopen(request, context=ssl_context)) as response:
+ return self.response_transformer(request.get_full_url(),
response.getcode(), response.read())
+
+ def rebased(self, new_base_url):
+ return RestClient(
+ new_base_url,
+ self.authenticator,
+ self.headers,
+ self.ssl_context,
+ self.request_transformer,
+ self.response_transformer)
+
+class ServiceComponent:
+ def __init__(self, client, a_dict):
+ self.client = client
+ self.name = a_dict['ServiceComponentInfo']['component_name']
+ self.component = a_dict
+
+ def host_names(self):
+ return [each['HostRoles']['host_name'] for each in
self.component['host_components']]
+
+ def __str__(self):
+ return self.name
+
+class Service:
+ def __init__(self, client, a_dict):
+ self.client = client
+ self.service = a_dict
+ self.href = self.service['href']
+ self.name = self.service['ServiceInfo']['service_name']
+
+ def delete(self):
+ try:
+ self.client.delete(self.href)
+ except urllib2.HTTPError as e:
+ if e.code != 404:
+ raise e
+
+ def start(self):
+ _, data = self.client.put(self.href, {'ServiceInfo': {'state' :
'STARTED'}})
+ return AsyncResult.of(self.client, data)
+
+ def components(self):
+ return [ServiceComponent(self.client, self.client.get(each['href'])[1])
for each in self.service['components']]
+
+ def component(self, component_name):
+ matches = [each for each in self.components() if each.name ==
'HDFS_CLIENT']
+ return matches[0] if matches else None
+
+ def __str__(self):
+ return self.name
+
+class Cluster:
+ def __init__(self, cluster_name, host, port=8080, protocol='http',
user='admin', password='admin', api_version='v1'):
+ self.cluster_name = cluster_name
+ self.base_url = Url.base(protocol, host, port) / 'api' / api_version
+ self.client = RestClient(
+ self.base_url / 'clusters' / cluster_name,
+ BasicAuth(user, password),
+ headers=[Header.csrf()],
+ ssl_context=PermissiveSslContext(),
+ request_transformer=json.dumps,
+ response_transformer=JsonTransformer())
+
+ def version(self):
+ _, data = self.client.get('')
+ return data['Clusters']['version']
+
+ def installed_stack(self):
+ stack_name, stack_ver = cluster.version().split('-')
+ return Stack(stack_name, stack_ver, self.client.rebased(self.base_url /
'stacks'))
+
+ def add_service(self, service_name):
+ self.client.post(Url('services') / service_name, {'ServiceInfo' :
{'service_name' : service_name}})
+
+ def add_service_component(self, service_name, component_name):
+ self.client.post(Url('services') / service_name / 'components' /
component_name, {})
+
+ def add_host_component(self, service_name, component_name, host_name):
+ self.client.post(
+ Url('hosts').query_params({'Hosts/host_name': host_name}),
+ {'host_components': [{'HostRoles': {'component_name': component_name}}]})
+ _, data = self.client.put(Url('services') / service_name, {'ServiceInfo':
{'state' : 'INSTALLED'}})
+ return AsyncResult.of(self.client, data)
+
+ def service(self, service_name):
+ _, data = self.client.get(Url('services') / service_name)
+ return Service(self.client, data)
+
+ def services(self):
+ _, data = self.client.get(Url('services'))
+ return [Service(self.client, self.client.get(each['href'])[1]) for each in
data['items']]
+
+ def has_service(self, service_name):
+ return service_name in [each.name for each in self.services()]
+
+ def add_config(self, config_type, tag, properties):
+ self.client.post(Url('configurations'), {
+ 'type': config_type,
+ 'tag': tag,
+ 'properties' : properties
+ })
+ self.client.put('', {
+ 'Clusters' : {
+ 'desired_configs': {'type': config_type, 'tag' : tag }
+ }
+ })
+
+ def config(self, config_type):
+ code, data = self.client.get(Url('configurations').query_params({'type':
config_type}))
+ return Configs(self.client, [Config(self.client, each) for each in
data['items']])
+
+ def start_all(self):
+ _, data = self.client.put('services', {
+ 'RequestInfo' : {
+ 'context' : '_PARSE_.START.ALL_SERVICES',
+ 'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' :
self.cluster_name }
+ },
+ 'Body' : { 'ServiceInfo' : {'state' : 'STARTED'} }
+ })
+ return AsyncResult.of(self.client, data)
+
+ def stop_all(self):
+ _, data = self.client.put('services', {
+ 'RequestInfo' : {
+ 'context' : '_PARSE_.STOP.ALL_SERVICES',
+ 'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' :
self.cluster_name }
+ },
+ 'Body' : { 'ServiceInfo' : {'state' : 'INSTALLED'} }
+ })
+ return AsyncResult.of(self.client, data)
+
+ def __str__(self):
+ return 'Cluster: %s (%s)' % (self.cluster_name, self.client.base_url)
+
+class OperationFailed(Exception): pass
+
+class AsyncResult:
+ @staticmethod
+ def of(client, data):
+ return AsyncResult(client, data) if data else NoResult()
+
+ def __init__(self, client, a_dict):
+ self.client = client
+ self.status = a_dict['Requests']['status']
+ self.id = a_dict['Requests']['id']
+ self.href = a_dict['href']
+
+ def request_status(self):
+ _, data = self.client.get(self.href)
+ return data['Requests']['request_status']
+
+ def is_finished(self):
+ return self.request_status() in ['FAILED', 'TIMEDOUT', 'ABORTED',
'COMPLETED', 'SKIPPED_FAILED']
+
+ def await(self):
+ while not self.is_finished():
+ time.sleep(1)
+ status = self.request_status()
+ if status != 'COMPLETED':
+ raise OperationFailed("%s failed with status: %s" % (self.id, status))
+ return status
+
+ def __str__(self):
+ return "Request status: %s id: %d" % (self.status, self.id)
+
+class NoResult:
+ def request_status(): return 'UNKNOWN'
+ def is_finished(self): return True
+ def await(self): pass
+
+class Config:
+ def __init__(self, client, a_dict):
+ self.client = client
+ self.config = a_dict
+
+ def version(self):
+ return int(self.config['version'])
+
+ def href(self):
+ return self.config['href']
+
+ def properties(self):
+ code, data = self.client.get(self.href())
+ return data['items'][0]['properties']
+
+ def __str__(self):
+ return json.dumps(self.config)
+
+class Configs:
+ def __init__(self, client, config_list):
+ self.client = client
+ self.configs = sorted(config_list, key=lambda config: config.version())
+
+ def latest(self):
+ return self.configs[-1]
+
+
+class Stack:
+ def __init__(self, stack_name, stack_version, client):
+ self.name = stack_name
+ self.version = stack_version
+ self.client = client
+
+ def has_service(self, service_name):
+ try:
+ _, data = self.client.get(Url(self.name) / 'versions' / self.version /
'services' / service_name)
+ return True
+ except urllib2.HTTPError as e:
+ if e.code == 404:
+ return False
+ else:
+ raise e
+
+class CannotLoad(Exception): pass
+
+class FsStorage:
+ def save(self, key, value):
+ with open("saved-" + key, 'wt') as f:
+ f.write(repr(value))
+
+ def load(self, key):
+ try:
+ with open("saved-" + key, 'rt') as f:
+ return eval(f.read())
+ except IOError as e:
+ raise CannotLoad(key + ' not found')
+
+class Conversion:
+ def __init__(self, cluster, storage):
+ self.cluster = cluster
+ self.storage = storage
+ self.supported_stacks = ['HDP-3.0']
+
+ def check_prerequisites(self):
+ print 'Checking %s' % self.cluster
+ ver = self.cluster.version()
+ print 'Found stack %s' % ver
+ if ver not in self.supported_stacks:
+ print 'Only %s stacks are supported.' % self.supported_stacks
+ return False
+ if not self.cluster.installed_stack().has_service('ONEFS'):
+ print 'ONEFS management pack is not installed.'
+ return False
+ sys.stdout.write('Please, confirm you have made backup of the Ambari db
[y/n] (n)? ')
+ if raw_input() != 'y':
+ return False
+ return True
+
+ def perform(self):
+ hdfs_client_hosts = self.find_hdfs_client_hosts()
+ self.stop_all_services()
+ self.read_configs()
+ self.delete_hdfs()
+ self.add_onefs()
+ self.configure_onefs()
+ self.install_onefs_clients(hdfs_client_hosts)
+ self.start_all_services()
+
+ def find_hdfs_client_hosts(self):
+ if self.cluster.has_service('HDFS'):
+ print 'Collecting hosts with HDFS_CLIENT'
+ hdfs_client_hosts =
self.cluster.service('HDFS').component('HDFS_CLIENT').host_names()
+ self.storage.save('hdfs_client_hosts', hdfs_client_hosts)
+ else:
+ print 'Using previously saved HDFS client hosts'
+ hdfs_client_hosts = self.storage.load('hdfs_client_hosts')
+ print 'Found hosts %s' % hdfs_client_hosts
+ return hdfs_client_hosts
+
+ def stop_all_services(self):
+ print 'Stopping all services..'
+ self.cluster.stop_all().await()
+
+ def read_configs(self):
+ if self.cluster.has_service('HDFS'):
+ print 'Downloading core-site..'
+ self.core_site = self.cluster.config('core-site').latest().properties()
+ print 'Downloading hdfs-site..'
+ self.hdfs_site = self.cluster.config('hdfs-site').latest().properties()
+ print 'Downloading hadoop-env..'
+ self.hadoop_env = self.cluster.config('hadoop-env').latest().properties()
+ self.storage.save('core-site', self.core_site)
+ self.storage.save('hdfs-site', self.hdfs_site)
+ self.storage.save('hadoop-env', self.hadoop_env)
+ else:
+ print 'Using previously saved HDFS configs'
+ self.core_site = self.storage.load('core-site')
+ self.hdfs_site = self.storage.load('hdfs-site')
+ self.hadoop_env = self.storage.load('hadoop-env')
+
+ def delete_hdfs(self):
+ print 'Deleting HDFS..'
+ if self.cluster.has_service('HDFS'):
+ self.cluster.service('HDFS').delete()
+ else:
+ print 'Already deleted.'
+
+ def add_onefs(self):
+ print 'Adding ONEFS..'
+ if self.cluster.has_service('ONEFS'):
+ print 'Already added.'
+ else:
+ self.cluster.add_service('ONEFS')
+ try:
+ self.cluster.add_service_component('ONEFS', 'ONEFS_CLIENT')
+ except urllib2.HTTPError as e:
+ if e.code != 409:
+ raise e
+
+ def configure_onefs(self):
+ print 'Adding ONEFS config..'
+ self.cluster.add_config('onefs', random_tag('onefs'), { "onefs_host" :
self.smart_connect_zone(self.core_site) })
+ print 'Adding core-site'
+ self.cluster.add_config('core-site', random_tag('new-core-site'),
self.core_site)
+ print 'Adding hdfs-site'
+ self.cluster.add_config('hdfs-site', random_tag('new-hdfs-site'),
self.hdfs_site)
+ print 'Adding hadoop-env-site'
+ self.cluster.add_config('hadoop-env', random_tag('new-hadoop-env'),
self.hadoop_env)
+
+ def smart_connect_zone(self, core_site):
+ def_fs = core_site['fs.defaultFS']
+ if '://' in def_fs:
+ def_fs = def_fs.split('://')[1]
+ if ':' in def_fs:
+ def_fs = def_fs.split(':')[0]
+ return def_fs
+
+ def install_onefs_clients(self, hdfs_client_hosts):
+ print 'Adding ONEFS_CLIENT to hosts: %s' % (hdfs_client_hosts)
+ results = [self.add_onefs_client(each) for each in hdfs_client_hosts]
+ for each in results:
+ each.await()
+
+ def add_onefs_client(self, hostname):
+ try:
+ return self.cluster.add_host_component('ONEFS', 'ONEFS_CLIENT', hostname)
+ except urllib2.HTTPError as e:
+ if e.code == 409:
+ print 'Already added to host %s' % hostname
+ return NoResult()
+ else:
+ raise e
+
+
+ def start_all_services(self):
+ print 'Starting all services..'
+ self.cluster.start_all().await()
+
+def random_tag(tag_name): return "%s-%s" % (tag_name, time.time())
+
+class CommandLine:
+ def __init__(self):
+ self.parser = OptionParser()
+ self.parser.add_option("-o", '--host', dest='host', help='Ambari server
host', default='localhost')
+ self.parser.add_option("-p", '--port', dest='port', help='Ambari server
port', default='8080')
+ self.parser.add_option("-c", '--cluster', dest='cluster_name',
help='Cluster name')
+ self.parser.add_option("-u", '--user', dest='admin_user', help='Admin user
name', default='admin')
+ self.parser.add_option("-k", '--password', dest='admin_pass', help='Admin
user name', default='admin')
+ self.parser.add_option("-t", '--protocol', dest='protocol', help='HTTP
protocol', default='http')
+
+ def parse_options(self):
+ options, args = self.parser.parse_args()
+ if not options.cluster_name:
+ self.parser.error('Missing cluster name.')
+ if not options.protocol or options.protocol.lower() not in ['http',
'https']:
+ self.parser.error('Invalid protocol. Use http or https.')
+ if not options.port or not options.port.isdigit():
+ self.parser.error('Port should be an integer')
+ return options
+
+if __name__ == '__main__':
+ options = CommandLine().parse_options()
+ cluster = Cluster(
+ options.cluster_name,
+ options.host,
+ port=int(options.port),
+ protocol=options.protocol.lower(),
+ user=options.admin_user,
+ password=options.admin_pass)
+ print 'This script will replace the HDFS service to ONEFS'
+ print 'The following prerequisites are required:'
+ print ' * ONEFS management package must be installed'
+ print ' * Ambari must be upgraded to >=v2.7.0'
+ print ' * Stack must be upgraded to HDP-3.0'
+ print ' * Is highly recommended to backup ambari database before you
proceed.'
+ conversion = Conversion(cluster, FsStorage())
+ if not conversion.check_prerequisites():
+ sys.exit()
+ else:
+ conversion.perform()