This is an automated email from the ASF dual-hosted git repository.

amagyar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 726d1d0  AMBARI-24708. Automation script for upgrade old style isilon 
cluster to the new mpack based structure (amagyar) (#2397)
726d1d0 is described below

commit 726d1d016ec8eae1056a1412f8b857927dfc32ad
Author: Attila Magyar <m.magy...@gmail.com>
AuthorDate: Mon Oct 1 13:40:54 2018 +0200

    AMBARI-24708. Automation script for upgrade old style isilon cluster to the 
new mpack based structure (amagyar) (#2397)
    
    * AMBARI-24708. Automation script for upgrade old style isilon cluster to 
the new mpack based structure (amagyar)
    
    * AMBARI-24708. Automation script for upgrade old style isilon cluster to 
the new mpack based structure (amagyar)
---
 .../src/main/tools/hdfs_to_onefs_convert.py        | 531 +++++++++++++++++++++
 1 file changed, 531 insertions(+)

diff --git 
a/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
 
b/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
new file mode 100644
index 0000000..4c63c9f
--- /dev/null
+++ 
b/contrib/management-packs/isilon-onefs-mpack/src/main/tools/hdfs_to_onefs_convert.py
@@ -0,0 +1,531 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import urllib2, base64, json, ssl, time, random, sys
+from optparse import OptionParser
+from contextlib import closing
+  
+class SslContext:
+  def build(self, url):
+    if not url.startswith('https') or not hasattr(ssl, 'SSLContext'):
+      return None
+    return ssl.SSLContext(self._protocol()) if self._protocol() else 
ssl.create_default_context()
+
+  def _protocol(self):
+    if hasattr(ssl, 'PROTOCOL_TLS'): return ssl.PROTOCOL_TLS
+    elif hasattr(ssl, 'PROTOCOL_TLSv1_2'): return ssl.PROTOCOL_TLSv1_2
+    elif hasattr(ssl, 'PROTOCOL_TLSv1_1'): return ssl.PROTOCOL_TLSv1_1
+    elif hasattr(ssl, 'PROTOCOL_TLSv1'): return ssl.PROTOCOL_TLSv1
+    else: return None
+
+class PermissiveSslContext:
+  def build(self, url):
+    context = SslContext().build(url)
+    if hasattr(context, '_https_verify_certificates'):
+      context._https_verify_certificates(False)
+    return context
+
+class Url:
+  @classmethod
+  def base(clazz, protocol, host, port):
+      return clazz('%s://%s:%d' % (protocol, host, port))
+
+  def __init__(self, url_str):
+    self.base = url_str.rstrip('/')
+
+  def __div__(self, suffix_url):
+    suffix_str = str(suffix_url)
+    if self._is_absolute(suffix_str): 
+      return Url(suffix_str)
+    else:
+      return Url(self.base + (suffix_str if suffix_str.startswith('/') else 
'/' + suffix_str))
+  
+  def _is_absolute(self, suffix_str):
+    return suffix_str.startswith(self.base)
+
+  def query_params(self, a_dict):
+    return Url(self.base + '?' + '&'.join('%s=%s' % (name, value) for name, 
value in a_dict.items()))
+
+  def __str__(self):
+    return self.base
+
+class Header:
+  @classmethod
+  def csrf(clazz):
+    return clazz('X-Requested-By', 'ambari')
+
+  def __init__(self, key, value):
+    self.key, self.value = key, value
+
+  def add_to(self, request):
+    request.add_header(self.key, self.value)
+
+class BasicAuth:
+  def __init__(self, user, password):
+    self.header = Header(
+      'Authorization', 
+      'Basic %s' % base64.encodestring('%s:%s' % (user, 
password)).replace('\n', ''))
+
+  def authenticate(self, request):
+    self.header.add_to(request)
+
+class ResponseTransformer:
+  @staticmethod
+  def identity():
+    return lambda url, code, data: (code, data)
+
+  def __call__(self, url, code, data):
+    raise RuntimeError('Subclass responsibility')
+
+class UnexpectedHttpCode(Exception): pass
+
+class JsonTransformer(ResponseTransformer):
+  def __call__(self, url, code, data):
+    if 200 <= code <= 299:
+      return code, self._parse(data)
+    else:
+      return UnexpectedHttpCode('Unexpected http code: %d url: %s response: 
%s' % (code, url, data))
+
+  def _parse(self, a_str):
+    if not a_str: 
+      return {}
+    try:
+      return json.loads(a_str)
+    except ValueError as e:
+      raise ValueError('Error %s while parsing: %s' % (e, a_str))
+
+class RestClient:
+  def __init__(self, an_url, authenticator, headers=[], 
ssl_context=SslContext(), request_transformer=lambda r:r, 
response_transformer=ResponseTransformer.identity()):
+    self.base_url = an_url
+    self.authenticator = authenticator
+    self.headers = headers
+    self.ssl_context = ssl_context
+    self.request_transformer = request_transformer
+    self.response_transformer = response_transformer
+
+  def get(self, suffix_str):
+    return self._response(*self._request(suffix_str, 'GET'))
+
+  def post(self, suffix_str, data):
+    return self._response(*self._request(suffix_str, 'POST', data=data))
+    
+  def put(self, suffix_str, data):
+    return self._response(*self._request(suffix_str, 'PUT', data=data))
+    
+  def delete(self, suffix_str):
+    return self._response(*self._request(suffix_str, 'DELETE'))
+    
+  def _request(self, suffix_str, http_method, data=""):
+    url = str(self.base_url / suffix_str)
+    request = urllib2.Request(url, data=self.request_transformer(data))
+    request.get_method = lambda: http_method
+    self.authenticator.authenticate(request)
+    map(lambda each: each.add_to(request), self.headers)
+    return request, self.ssl_context.build(url)
+
+  def _response(self, request, ssl_context):
+    with closing(urllib2.urlopen(request, context=ssl_context)) as response:
+      return self.response_transformer(request.get_full_url(), 
response.getcode(), response.read())
+
+  def rebased(self, new_base_url):
+    return RestClient(
+      new_base_url,
+      self.authenticator,
+      self.headers,
+      self.ssl_context,
+      self.request_transformer,
+      self.response_transformer)
+
+class ServiceComponent:
+  def __init__(self, client, a_dict):
+    self.client = client
+    self.name = a_dict['ServiceComponentInfo']['component_name']
+    self.component = a_dict
+
+  def host_names(self):
+    return [each['HostRoles']['host_name'] for each in 
self.component['host_components']]
+
+  def __str__(self):
+    return self.name
+
+class Service:
+  def __init__(self, client, a_dict):
+    self.client = client
+    self.service = a_dict
+    self.href = self.service['href']
+    self.name = self.service['ServiceInfo']['service_name']
+
+  def delete(self):
+    try:
+      self.client.delete(self.href)
+    except urllib2.HTTPError as e:
+      if e.code != 404:
+        raise e
+
+  def start(self):
+    _, data = self.client.put(self.href, {'ServiceInfo': {'state' : 
'STARTED'}})
+    return AsyncResult.of(self.client, data)
+
+  def components(self):
+    return [ServiceComponent(self.client, self.client.get(each['href'])[1]) 
for each in self.service['components']]
+
+  def component(self, component_name):
+    matches = [each for each in self.components() if each.name == 
'HDFS_CLIENT']
+    return matches[0] if matches else None
+
+  def __str__(self):
+    return self.name
+
+class Cluster:
+  def __init__(self, cluster_name, host, port=8080, protocol='http', 
user='admin', password='admin', api_version='v1'):
+    self.cluster_name = cluster_name
+    self.base_url = Url.base(protocol, host, port) / 'api' / api_version
+    self.client = RestClient(
+      self.base_url / 'clusters' / cluster_name,
+      BasicAuth(user, password),
+      headers=[Header.csrf()],
+      ssl_context=PermissiveSslContext(),
+      request_transformer=json.dumps,
+      response_transformer=JsonTransformer())
+
+  def version(self):
+    _, data = self.client.get('')
+    return data['Clusters']['version']
+
+  def installed_stack(self):
+    stack_name, stack_ver = cluster.version().split('-')
+    return Stack(stack_name, stack_ver, self.client.rebased(self.base_url / 
'stacks'))
+
+  def add_service(self, service_name):
+    self.client.post(Url('services') / service_name, {'ServiceInfo' : 
{'service_name' : service_name}})
+
+  def add_service_component(self, service_name, component_name):
+    self.client.post(Url('services') / service_name / 'components' / 
component_name, {})
+
+  def add_host_component(self, service_name, component_name, host_name):
+    self.client.post(
+      Url('hosts').query_params({'Hosts/host_name': host_name}),
+      {'host_components': [{'HostRoles': {'component_name': component_name}}]})
+    _, data = self.client.put(Url('services') / service_name, {'ServiceInfo': 
{'state' : 'INSTALLED'}})
+    return AsyncResult.of(self.client, data)
+
+  def service(self, service_name):
+    _, data = self.client.get(Url('services') / service_name)
+    return Service(self.client, data)
+
+  def services(self):
+    _, data = self.client.get(Url('services'))
+    return [Service(self.client, self.client.get(each['href'])[1]) for each in 
data['items']]
+
+  def has_service(self, service_name):
+    return service_name in [each.name for each in self.services()]
+
+  def add_config(self, config_type, tag, properties):
+    self.client.post(Url('configurations'), {
+      'type': config_type,
+      'tag': tag,
+      'properties' : properties
+    })
+    self.client.put('', {
+      'Clusters' : { 
+        'desired_configs': {'type': config_type, 'tag' : tag }
+      }
+    })
+  
+  def config(self, config_type):
+    code, data = self.client.get(Url('configurations').query_params({'type': 
config_type}))
+    return Configs(self.client, [Config(self.client, each) for each in 
data['items']])
+
+  def start_all(self):
+    _, data = self.client.put('services', {
+      'RequestInfo' : {
+        'context' : '_PARSE_.START.ALL_SERVICES',
+        'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : 
self.cluster_name }
+      },
+      'Body' : { 'ServiceInfo' : {'state' : 'STARTED'} }
+    })
+    return AsyncResult.of(self.client, data)
+
+  def stop_all(self):
+    _, data = self.client.put('services', {
+      'RequestInfo' : {
+        'context' : '_PARSE_.STOP.ALL_SERVICES',
+        'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : 
self.cluster_name }
+      },
+      'Body' : { 'ServiceInfo' : {'state' : 'INSTALLED'} }
+    })
+    return AsyncResult.of(self.client, data)
+
+  def __str__(self):
+    return 'Cluster: %s (%s)' % (self.cluster_name, self.client.base_url)
+
+class OperationFailed(Exception): pass
+
+class AsyncResult:
+  @staticmethod
+  def of(client, data):
+    return AsyncResult(client, data) if data else NoResult()
+
+  def __init__(self, client, a_dict):
+    self.client = client
+    self.status = a_dict['Requests']['status']
+    self.id = a_dict['Requests']['id']
+    self.href = a_dict['href']
+
+  def request_status(self):
+    _, data = self.client.get(self.href)
+    return data['Requests']['request_status']
+
+  def is_finished(self):
+    return self.request_status() in ['FAILED', 'TIMEDOUT', 'ABORTED', 
'COMPLETED', 'SKIPPED_FAILED']
+
+  def await(self):
+    while not self.is_finished():
+      time.sleep(1)
+    status = self.request_status()
+    if status != 'COMPLETED':
+      raise OperationFailed("%s failed with status: %s" % (self.id, status))
+    return status
+
+  def __str__(self):
+    return "Request status: %s id: %d" % (self.status, self.id)
+
+class NoResult:
+  def request_status(): return 'UNKNOWN'
+  def is_finished(self): return True
+  def await(self): pass
+
+class Config:
+  def __init__(self, client, a_dict):
+    self.client = client
+    self.config = a_dict
+  
+  def version(self):
+    return int(self.config['version'])
+
+  def href(self):
+    return self.config['href']
+
+  def properties(self):
+    code, data = self.client.get(self.href())
+    return data['items'][0]['properties']
+
+  def __str__(self):
+    return json.dumps(self.config)
+
+class Configs:
+  def __init__(self, client, config_list):
+    self.client = client
+    self.configs = sorted(config_list, key=lambda config: config.version())
+
+  def latest(self):
+    return self.configs[-1]
+
+
+class Stack:
+  def __init__(self, stack_name, stack_version, client):
+    self.name = stack_name
+    self.version = stack_version
+    self.client = client
+
+  def has_service(self, service_name):
+    try:
+      _, data = self.client.get(Url(self.name) / 'versions' / self.version / 
'services' / service_name)
+      return True
+    except urllib2.HTTPError as e:
+      if e.code == 404:
+        return False
+      else:
+        raise e
+
+class CannotLoad(Exception): pass
+
+class FsStorage:
+  def save(self, key, value):
+    with open("saved-" + key, 'wt') as f:
+      f.write(repr(value))
+
+  def load(self, key):
+    try:
+      with open("saved-" + key, 'rt') as f:
+        return eval(f.read())
+    except IOError as e:
+      raise CannotLoad(key + ' not found')
+
+class Conversion:
+  def __init__(self, cluster, storage):
+    self.cluster = cluster
+    self.storage = storage
+    self.supported_stacks = ['HDP-3.0']
+
+  def check_prerequisites(self):
+    print 'Checking %s' % self.cluster
+    ver = self.cluster.version()
+    print 'Found stack %s' % ver
+    if ver not in self.supported_stacks:
+      print 'Only %s stacks are supported.' % self.supported_stacks
+      return False
+    if not self.cluster.installed_stack().has_service('ONEFS'):
+      print 'ONEFS management pack is not installed.'
+      return False
+    sys.stdout.write('Please, confirm you have made backup of the Ambari db 
[y/n] (n)? ')
+    if raw_input() != 'y':
+      return False
+    return True
+
+  def perform(self):
+    hdfs_client_hosts = self.find_hdfs_client_hosts()
+    self.stop_all_services()
+    self.read_configs()
+    self.delete_hdfs()
+    self.add_onefs()
+    self.configure_onefs()
+    self.install_onefs_clients(hdfs_client_hosts)
+    self.start_all_services()
+
+  def find_hdfs_client_hosts(self):
+    if self.cluster.has_service('HDFS'):
+      print 'Collecting hosts with HDFS_CLIENT'
+      hdfs_client_hosts = 
self.cluster.service('HDFS').component('HDFS_CLIENT').host_names()
+      self.storage.save('hdfs_client_hosts', hdfs_client_hosts)
+    else:
+      print 'Using previously saved HDFS client hosts'
+      hdfs_client_hosts = self.storage.load('hdfs_client_hosts')
+    print 'Found hosts %s' % hdfs_client_hosts
+    return hdfs_client_hosts
+
+  def stop_all_services(self):
+    print 'Stopping all services..' 
+    self.cluster.stop_all().await()
+
+  def read_configs(self):
+    if self.cluster.has_service('HDFS'):
+      print 'Downloading core-site..'
+      self.core_site = self.cluster.config('core-site').latest().properties()
+      print 'Downloading hdfs-site..'
+      self.hdfs_site = self.cluster.config('hdfs-site').latest().properties()
+      print 'Downloading hadoop-env..'
+      self.hadoop_env = self.cluster.config('hadoop-env').latest().properties()
+      self.storage.save('core-site', self.core_site)
+      self.storage.save('hdfs-site', self.hdfs_site)
+      self.storage.save('hadoop-env', self.hadoop_env)
+    else:
+      print 'Using previously saved HDFS configs'
+      self.core_site = self.storage.load('core-site')
+      self.hdfs_site = self.storage.load('hdfs-site')
+      self.hadoop_env = self.storage.load('hadoop-env')
+
+  def delete_hdfs(self):
+    print 'Deleting HDFS..'
+    if self.cluster.has_service('HDFS'):
+      self.cluster.service('HDFS').delete()
+    else:
+      print 'Already deleted.'
+  
+  def add_onefs(self):
+    print 'Adding ONEFS..'
+    if self.cluster.has_service('ONEFS'):
+      print 'Already added.'
+    else:
+      self.cluster.add_service('ONEFS')
+    try:
+      self.cluster.add_service_component('ONEFS', 'ONEFS_CLIENT')
+    except urllib2.HTTPError as e:
+      if e.code != 409:
+        raise e
+
+  def configure_onefs(self):
+    print 'Adding ONEFS config..'
+    self.cluster.add_config('onefs', random_tag('onefs'), { "onefs_host" : 
self.smart_connect_zone(self.core_site) })
+    print 'Adding core-site'
+    self.cluster.add_config('core-site', random_tag('new-core-site'), 
self.core_site)
+    print 'Adding hdfs-site'
+    self.cluster.add_config('hdfs-site', random_tag('new-hdfs-site'), 
self.hdfs_site)
+    print 'Adding hadoop-env-site'
+    self.cluster.add_config('hadoop-env', random_tag('new-hadoop-env'), 
self.hadoop_env)
+
+  def smart_connect_zone(self, core_site):
+    def_fs = core_site['fs.defaultFS']
+    if '://' in def_fs:
+      def_fs = def_fs.split('://')[1]
+    if ':' in def_fs:
+      def_fs = def_fs.split(':')[0]
+    return def_fs
+
+  def install_onefs_clients(self, hdfs_client_hosts):
+    print 'Adding ONEFS_CLIENT to hosts: %s' % (hdfs_client_hosts)
+    results = [self.add_onefs_client(each) for each in hdfs_client_hosts]
+    for each in results:
+      each.await() 
+
+  def add_onefs_client(self, hostname):
+    try:
+      return self.cluster.add_host_component('ONEFS', 'ONEFS_CLIENT', hostname)
+    except urllib2.HTTPError as e:
+      if e.code == 409:
+        print 'Already added to host %s' % hostname
+        return NoResult() 
+      else:
+        raise e
+
+
+  def start_all_services(self):
+    print 'Starting all services..'
+    self.cluster.start_all().await()
+
+def random_tag(tag_name): return "%s-%s" % (tag_name, time.time())
+
+class CommandLine:
+  def __init__(self):
+    self.parser = OptionParser()
+    self.parser.add_option("-o", '--host', dest='host', help='Ambari server 
host', default='localhost')
+    self.parser.add_option("-p", '--port', dest='port', help='Ambari server 
port', default='8080')
+    self.parser.add_option("-c", '--cluster', dest='cluster_name', 
help='Cluster name')
+    self.parser.add_option("-u", '--user', dest='admin_user', help='Admin user 
name', default='admin')
+    self.parser.add_option("-k", '--password', dest='admin_pass', help='Admin 
user name', default='admin')
+    self.parser.add_option("-t", '--protocol', dest='protocol', help='HTTP 
protocol', default='http')
+
+  def parse_options(self):
+    options, args = self.parser.parse_args()
+    if not options.cluster_name:
+      self.parser.error('Missing cluster name.')
+    if not options.protocol or options.protocol.lower() not in ['http', 
'https']:
+      self.parser.error('Invalid protocol. Use http or https.')
+    if not options.port or not options.port.isdigit():
+      self.parser.error('Port should be an integer')
+    return options
+
+if __name__ == '__main__':
+  options = CommandLine().parse_options()
+  cluster = Cluster(
+    options.cluster_name,
+    options.host,
+    port=int(options.port),
+    protocol=options.protocol.lower(),
+    user=options.admin_user,
+    password=options.admin_pass)
+  print 'This script will replace the HDFS service to ONEFS'
+  print 'The following prerequisites are required:'
+  print '  * ONEFS management package must be installed'
+  print '  * Ambari must be upgraded to >=v2.7.0'
+  print '  * Stack must be upgraded to HDP-3.0'
+  print '  * Is highly recommended to backup ambari database before you 
proceed.'
+  conversion = Conversion(cluster, FsStorage())
+  if not conversion.check_prerequisites():
+    sys.exit()
+  else:
+    conversion.perform()

Reply via email to