Author: tomwhite
Date: Tue Jan 5 22:54:51 2010
New Revision: 896259
URL: http://svn.apache.org/viewvc?rev=896259&view=rev
Log:
HADOOP-6466. Add a ZooKeeper service to the cloud scripts.
Added:
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/contrib/cloud/README.txt
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Jan 5 22:54:51 2010
@@ -33,6 +33,8 @@
HADOOP-6415. Adds a common token interface for both job token and
delegation token. (Kan Zhang via ddas)
+ HADOOP-6466. Add a ZooKeeper service to the cloud scripts. (tomwhite)
+
IMPROVEMENTS
HADOOP-6283. Improve the exception messages thrown by
Modified: hadoop/common/trunk/src/contrib/cloud/README.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/README.txt?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/README.txt (original)
+++ hadoop/common/trunk/src/contrib/cloud/README.txt Tue Jan 5 22:54:51 2010
@@ -316,3 +316,24 @@
It's possible to use any image, as long as it i) runs (gzip compressed) user
data on boot, and ii) has Java installed.
+OTHER SERVICES
+==============
+
+ZooKeeper
+=========
+
+You can run ZooKeeper by setting the "service" parameter to "zookeeper". For
+example:
+
+[my-zookeeper-cluster]
+service=zookeeper
+ami=ami-ed59bf84
+instance_type=m1.small
+key_name=tom
+availability_zone=us-east-1c
+public_key=PATH_TO_PUBLIC_KEY
+private_key=PATH_TO_PRIVATE_KEY
+
+Then to launch a three-node ZooKeeper ensemble, run:
+
+% ./hadoop-ec2 launch-cluster my-zookeeper-cluster 3 zk
Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py (original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/cli.py Tue Jan 5
22:54:51 2010
@@ -18,8 +18,8 @@
import ConfigParser
from hadoop.cloud import VERSION
from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.service import get_service
from hadoop.cloud.service import InstanceTemplate
-from hadoop.cloud.service import HadoopService
from hadoop.cloud.service import NAMENODE
from hadoop.cloud.service import SECONDARY_NAMENODE
from hadoop.cloud.service import JOBTRACKER
@@ -33,6 +33,7 @@
import os
import sys
+DEFAULT_SERVICE_NAME = 'hadoop'
DEFAULT_CLOUD_PROVIDER = 'ec2'
DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud'
@@ -183,8 +184,10 @@
cluster_name = args[0]
opt = merge_config_with_options(cluster_name, config, options_dict)
logging.debug("Options: %s", str(opt))
- cluster = get_cluster(get_cloud_provider(opt))(cluster_name, config_dir)
- service = get_service(cluster)
+ service_name = get_service_name(opt)
+ cloud_provider = get_cloud_provider(opt)
+ cluster = get_cluster(cloud_provider)(cluster_name, config_dir)
+ service = get_service(service_name, cloud_provider)(cluster)
return (opt, args, service)
def parse_options(command, option_list=[], expected_arguments=(),
@@ -223,15 +226,18 @@
config_dir = DEFAULT_CONFIG_DIR
return config_dir
+def get_service_name(options_dict):
+ service_name = options_dict.get("service", None)
+ if service_name is None:
+ service_name = DEFAULT_SERVICE_NAME
+ return service_name
+
def get_cloud_provider(options_dict):
provider = options_dict.get("cloud_provider", None)
if provider is None:
provider = DEFAULT_CLOUD_PROVIDER
return provider
-def get_service(cluster):
- return HadoopService(cluster)
-
def check_options_set(options, option_names):
for option_name in option_names:
if options.get(option_name) is None:
@@ -268,8 +274,10 @@
if command == 'list':
(opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
if len(args) == 0:
- service = get_service(None)
- service.list_all(get_cloud_provider(opt))
+ service_name = get_service_name(opt)
+ cloud_provider = get_cloud_provider(opt)
+ service = get_service(service_name, cloud_provider)(None)
+ service.list_all(cloud_provider)
else:
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
service.list()
Added:
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh?rev=896259&view=auto
==============================================================================
---
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
(added)
+++
hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/data/zookeeper-ec2-init-remote.sh
Tue Jan 5 22:54:51 2010
@@ -0,0 +1,112 @@
+#!/bin/bash -x
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+# Script that is run on each EC2 instance on boot. It is passed in the EC2 user
+# data, so should not exceed 16K in size after gzip compression.
+#
+# This script is executed by /etc/init.d/ec2-run-user-data, and output is
+# logged to /var/log/messages.
+################################################################################
+
+################################################################################
+# Initialize variables
+################################################################################
+
+# Substitute environment variables passed by the client
+export %ENV%
+
+ZK_VERSION=${ZK_VERSION:-3.2.2}
+ZOOKEEPER_HOME=/usr/local/zookeeper-$ZK_VERSION
+ZK_CONF_DIR=/etc/zookeeper/conf
+
+function register_auto_shutdown() {
+ if [ ! -z "$AUTO_SHUTDOWN" ]; then
+ shutdown -h +$AUTO_SHUTDOWN >/dev/null &
+ fi
+}
+
+# Install a list of packages on debian or redhat as appropriate
+function install_packages() {
+ if which dpkg &> /dev/null; then
+ apt-get update
+ apt-get -y install $@
+ elif which rpm &> /dev/null; then
+ yum install -y $@
+ else
+ echo "No package manager found."
+ fi
+}
+
+# Install any user packages specified in the USER_PACKAGES environment variable
+function install_user_packages() {
+ if [ ! -z "$USER_PACKAGES" ]; then
+ install_packages $USER_PACKAGES
+ fi
+}
+
+function install_zookeeper() {
+
zk_tar_url=http://www.apache.org/dist/hadoop/zookeeper/zookeeper-$ZK_VERSION/zookeeper-$ZK_VERSION.tar.gz
+ zk_tar_file=`basename $zk_tar_url`
+ zk_tar_md5_file=`basename $zk_tar_url.md5`
+
+ curl="curl --retry 3 --silent --show-error --fail"
+ for i in `seq 1 3`;
+ do
+ $curl -O $zk_tar_url
+ $curl -O $zk_tar_url.md5
+ if md5sum -c $zk_tar_md5_file; then
+ break;
+ else
+ rm -f $zk_tar_file $zk_tar_md5_file
+ fi
+ done
+
+ if [ ! -e $zk_tar_file ]; then
+ echo "Failed to download $zk_tar_url. Aborting."
+ exit 1
+ fi
+
+ tar zxf $zk_tar_file -C /usr/local
+ rm -f $zk_tar_file $zk_tar_md5_file
+
+ echo "export ZOOKEEPER_HOME=$ZOOKEEPER_HOME" >> ~root/.bashrc
+ echo 'export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH' >> ~root/.bashrc
+}
+
+function configure_zookeeper() {
+ mkdir -p /mnt/zookeeper/logs
+ ln -s /mnt/zookeeper/logs /var/log/zookeeper
+ mkdir -p /var/log/zookeeper/txlog
+ mkdir -p $ZK_CONF_DIR
+ cp $ZOOKEEPER_HOME/conf/log4j.properties $ZK_CONF_DIR
+
+ sed -i -e "s|log4j.rootLogger=INFO, CONSOLE|log4j.rootLogger=INFO,
ROLLINGFILE|" \
+ -e
"s|log4j.appender.ROLLINGFILE.File=zookeeper.log|log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log|"
\
+ $ZK_CONF_DIR/log4j.properties
+
+ # Ensure ZooKeeper starts on boot
+ cat > /etc/rc.local <<EOF
+ZOOCFGDIR=$ZK_CONF_DIR $ZOOKEEPER_HOME/bin/zkServer.sh start > /dev/null 2>&1 &
+EOF
+
+}
+
+register_auto_shutdown
+install_user_packages
+install_zookeeper
+configure_zookeeper
Modified: hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py?rev=896259&r1=896258&r2=896259&view=diff
==============================================================================
--- hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py
(original)
+++ hadoop/common/trunk/src/contrib/cloud/src/py/hadoop/cloud/service.py Tue
Jan 5 22:54:51 2010
@@ -71,14 +71,174 @@
new_env_strings.extend(env_strings)
self.env_strings = new_env_strings
-class HadoopService(object):
+
+class Service(object):
"""
- A HDFS and MapReduce service.
+ A general service that runs on a cluster.
"""
def __init__(self, cluster):
self.cluster = cluster
+ def get_service_code(self):
+ """
+ The code that uniquely identifies the service.
+ """
+ raise Exception("Unimplemented")
+
+ def list_all(self, provider):
+ """
+ Find and print all clusters running this type of service.
+ """
+ raise Exception("Unimplemented")
+
+ def list(self):
+ """
+ Find and print all the instances running in this cluster.
+ """
+ raise Exception("Unimplemented")
+
+ def launch_master(self, instance_template, config_dir, client_cidr):
+ """
+ Launch a "master" instance.
+ """
+ raise Exception("Unimplemented")
+
+ def launch_slaves(self, instance_template):
+ """
+ Launch "slave" instance.
+ """
+ raise Exception("Unimplemented")
+
+ def launch_cluster(self, instance_templates, config_dir, client_cidr):
+ """
+ Launch a cluster of instances.
+ """
+ raise Exception("Unimplemented")
+
+ def terminate_cluster(self, force=False):
+ self.cluster.print_status()
+ if not force and not self._prompt("Terminate all instances?"):
+ print "Not terminating cluster."
+ else:
+ print "Terminating cluster"
+ self.cluster.terminate()
+
+ def delete_cluster(self):
+ self.cluster.delete()
+
+ def create_formatted_snapshot(self, size, availability_zone,
+ image_id, key_name, ssh_options):
+ Ec2Storage.create_formatted_snapshot(self.cluster, size,
+ availability_zone,
+ image_id,
+ key_name,
+ ssh_options)
+
+ def list_storage(self):
+ storage = self.cluster.get_storage()
+ storage.print_status()
+
+ def create_storage(self, role, number_of_instances,
+ availability_zone, spec_file):
+ storage = self.cluster.get_storage()
+ storage.create(role, number_of_instances, availability_zone, spec_file)
+ storage.print_status()
+
+ def attach_storage(self, role):
+ storage = self.cluster.get_storage()
+ storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+ storage.print_status()
+
+ def delete_storage(self, force=False):
+ storage = self.cluster.get_storage()
+ storage.print_status()
+ if not force and not self._prompt("Delete all storage volumes? THIS WILL \
+ PERMANENTLY DELETE ALL DATA"):
+ print "Not deleting storage volumes."
+ else:
+ print "Deleting storage"
+ for role in storage.get_roles():
+ storage.delete(role)
+
+ def login(self, ssh_options):
+ raise Exception("Unimplemented")
+
+ def proxy(self, ssh_options):
+ raise Exception("Unimplemented")
+
+ def push(self, ssh_options, file):
+ raise Exception("Unimplemented")
+
+ def execute(self, ssh_options, args):
+ raise Exception("Unimplemented")
+
+ def update_slaves_file(self, config_dir, ssh_options, private_key):
+ raise Exception("Unimplemented")
+
+ def _prompt(self, prompt):
+ """ Returns true if user responds "yes" to prompt. """
+ return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
+
+ def _call(self, command):
+ print command
+ try:
+ subprocess.call(command, shell=True)
+ except Exception, e:
+ print e
+
+ def _get_default_user_data_file_template(self):
+ data_path = os.path.join(os.path.dirname(__file__), 'data')
+ return os.path.join(data_path, '%s-%s-init-remote.sh' %
+ (self.get_service_code(), self.cluster.get_provider_code()))
+
+ def _launch_instances(self, instance_template):
+ it = instance_template
+ user_data_file_template = it.user_data_file_template
+ if it.user_data_file_template == None:
+ user_data_file_template = self._get_default_user_data_file_template()
+ ebs_mappings = ''
+ storage = self.cluster.get_storage()
+ for role in it.roles:
+ if storage.has_any_storage((role,)):
+ ebs_mappings = storage.get_mappings_string_for_role(role)
+ replacements = { "%ENV%": build_env_string(it.env_strings, {
+ "ROLES": ",".join(it.roles),
+ "USER_PACKAGES": it.user_packages,
+ "AUTO_SHUTDOWN": it.auto_shutdown,
+ "EBS_MAPPINGS": ebs_mappings,
+ }) }
+ instance_user_data = InstanceUserData(user_data_file_template,
replacements)
+ instance_ids = self.cluster.launch_instances(it.roles, it.number,
it.image_id,
+ it.size_id,
+ instance_user_data,
+ key_name=it.key_name,
+ public_key=it.public_key,
+ placement=it.placement)
+ print "Waiting for %s instances in role %s to start" % \
+ (it.number, ",".join(it.roles))
+ try:
+ self.cluster.wait_for_instances(instance_ids)
+ print "%s instances started" % ",".join(it.roles)
+ except TimeoutException:
+ print "Timeout while waiting for %s instance to start." %
",".join(it.roles)
+ return
+ print
+ self.cluster.print_status(it.roles[0])
+ return self.cluster.get_instances_in_role(it.roles[0], "running")
+
+
+class HadoopService(Service):
+ """
+ A HDFS and MapReduce service.
+ """
+
+ def __init__(self, cluster):
+ super(HadoopService, self).__init__(cluster)
+
+ def get_service_code(self):
+ return "hadoop"
+
def list_all(self, provider):
"""
Find and print clusters that have a running namenode instances
@@ -160,14 +320,6 @@
file, master.public_ip),
shell=True)
- def push(self, ssh_options, file):
- master = self._get_master()
- if not master:
- sys.exit(1)
- subprocess.call('scp %s -r %s r...@%s:' % (xstr(ssh_options),
- file, master.public_ip),
- shell=True)
-
def execute(self, ssh_options, args):
master = self._get_master()
if not master:
@@ -176,51 +328,6 @@
master.public_ip,
" ".join(args)), shell=True)
- def terminate_cluster(self, force=False):
- self.cluster.print_status()
- if not force and not self._prompt("Terminate all instances?"):
- print "Not terminating cluster."
- else:
- print "Terminating cluster"
- self.cluster.terminate()
-
- def delete_cluster(self):
- self.cluster.delete()
-
- def create_formatted_snapshot(self, size, availability_zone,
- image_id, key_name, ssh_options):
- Ec2Storage.create_formatted_snapshot(self.cluster, size,
- availability_zone,
- image_id,
- key_name,
- ssh_options)
-
- def list_storage(self):
- storage = self.cluster.get_storage()
- storage.print_status()
-
- def create_storage(self, role, number_of_instances,
- availability_zone, spec_file):
- storage = self.cluster.get_storage()
- storage.create(role, number_of_instances, availability_zone, spec_file)
- storage.print_status()
-
- def attach_storage(self, role):
- storage = self.cluster.get_storage()
- storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
- storage.print_status()
-
- def delete_storage(self, force=False):
- storage = self.cluster.get_storage()
- storage.print_status()
- if not force and not self._prompt("Delete all storage volumes? THIS WILL \
- PERMANENTLY DELETE ALL DATA"):
- print "Not deleting storage volumes."
- else:
- print "Deleting storage"
- for role in storage.get_roles():
- storage.delete(role)
-
def update_slaves_file(self, config_dir, ssh_options, private_key):
instances = self.cluster.check_running(NAMENODE, 1)
if not instances:
@@ -241,16 +348,6 @@
subprocess.call('scp %s -r %s r...@%s:/root/.ssh/id_rsa' % \
(ssh_options, private_key, slave.public_ip), shell=True)
- def _prompt(self, prompt):
- """ Returns true if user responds "yes" to prompt. """
- return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
-
-
- def _get_default_user_data_file_template(self):
- data_path = os.path.join(os.path.dirname(__file__), 'data')
- return os.path.join(data_path, 'hadoop-%s-init-remote.sh' %
- self.cluster.get_provider_code())
-
def _get_master(self):
# For split namenode/jobtracker, designate the namenode as the master
return self._get_namenode()
@@ -288,42 +385,6 @@
"""Replace characters in role name with ones allowed in bash variable
names"""
return role.replace('+', '_').upper()
- def _launch_instances(self, instance_template):
- it = instance_template
- user_data_file_template = it.user_data_file_template
- if it.user_data_file_template == None:
- user_data_file_template = self._get_default_user_data_file_template()
- ebs_mappings = ''
- storage = self.cluster.get_storage()
- for role in it.roles:
- if storage.has_any_storage((role,)):
- ebs_mappings = storage.get_mappings_string_for_role(role)
- replacements = { "%ENV%": build_env_string(it.env_strings, {
- "ROLES": ",".join(it.roles),
- "USER_PACKAGES": it.user_packages,
- "AUTO_SHUTDOWN": it.auto_shutdown,
- "EBS_MAPPINGS": ebs_mappings,
- }) }
- instance_user_data = InstanceUserData(user_data_file_template,
replacements)
- instance_ids = self.cluster.launch_instances(it.roles, it.number,
it.image_id,
- it.size_id,
- instance_user_data,
- key_name=it.key_name,
- public_key=it.public_key,
- placement=it.placement,
- security_groups=it.security_groups)
- print "Waiting for %s instances in role %s to start" % \
- (it.number, ",".join(it.roles))
- try:
- self.cluster.wait_for_instances(instance_ids)
- print "%s instances started" % ",".join(it.roles)
- except TimeoutException:
- print "Timeout while waiting for %s instance to start." %
",".join(it.roles)
- return
- print
- self.cluster.print_status(it.roles[0])
- return self.cluster.get_instances_in_role(it.roles[0], "running")
-
def _authorize_client_ports(self, client_cidrs=[]):
if not client_cidrs:
logger.debug("No client CIDRs specified, using local address.")
@@ -463,4 +524,111 @@
time.sleep(10)
for role in roles:
storage.attach(role, self.cluster.get_instances_in_role(role,
'running'))
- storage.print_status(roles)
\ No newline at end of file
+ storage.print_status(roles)
+
+
+class ZooKeeperService(Service):
+ """
+ A ZooKeeper service.
+ """
+
+ ZOOKEEPER_ROLE = "zk"
+
+ def __init__(self, cluster):
+ super(ZooKeeperService, self).__init__(cluster)
+
+ def get_service_code(self):
+ return "zookeeper"
+
+ def launch_cluster(self, instance_templates, config_dir, client_cidr):
+ self._launch_cluster_instances(instance_templates)
+ self._authorize_client_ports(client_cidr)
+ self._update_cluster_membership(instance_templates[0].public_key)
+
+ def _launch_cluster_instances(self, instance_templates):
+ for instance_template in instance_templates:
+ instances = self._launch_instances(instance_template)
+
+ def _authorize_client_ports(self, client_cidrs=[]):
+ if not client_cidrs:
+ logger.debug("No client CIDRs specified, using local address.")
+ client_ip = url_get('http://checkip.amazonaws.com/').strip()
+ client_cidrs = ("%s/32" % client_ip,)
+ logger.debug("Client CIDRs: %s", client_cidrs)
+ for client_cidr in client_cidrs:
+ self.cluster.authorize_role(self.ZOOKEEPER_ROLE, 2181, 2181, client_cidr)
+
+ def _update_cluster_membership(self, public_key):
+ time.sleep(30) # wait for SSH daemon to start
+
+ ssh_options = '-o StrictHostKeyChecking=no'
+ private_key = public_key[:-4] # TODO: pass in private key explicitly
+
+ instances = self.cluster.get_instances_in_role(self.ZOOKEEPER_ROLE,
+ 'running')
+ config_file = 'zoo.cfg'
+ with open(config_file, 'w') as f:
+ f.write("""# The number of milliseconds of each tick
+tickTime=2000
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=5
+# The directory where the snapshot is stored.
+dataDir=/var/log/zookeeper/txlog
+# The port at which the clients will connect
+clientPort=2181
+# The servers in the ensemble
+""")
+ counter = 1
+ for i in instances:
+ f.write("server.%s=%s:2888:3888\n" % (counter, i.private_ip))
+ counter += 1
+ # copy to each node in the cluster
+ myid_file = 'myid'
+ counter = 1
+ for i in instances:
+ self._call('scp -i %s %s %s r...@%s:/etc/zookeeper/conf/zoo.cfg' \
+ % (private_key, ssh_options, config_file, i.public_ip))
+ with open(myid_file, 'w') as f:
+ f.write(str(counter) + "\n")
+ self._call('scp -i %s %s %s r...@%s:/var/log/zookeeper/txlog/myid' \
+ % (private_key, ssh_options, myid_file, i.public_ip))
+ counter += 1
+ os.remove(config_file)
+ os.remove(myid_file)
+
+ # start the zookeeper servers
+ for i in instances:
+ self._call('ssh -i %s %s r...@%s nohup /etc/rc.local &' \
+ % (private_key, ssh_options, i.public_ip))
+
+ hosts_string = ",".join(["%s:2181" % i.public_ip for i in instances])
+ print "ZooKeeper cluster: %s" % hosts_string
+
+SERVICE_PROVIDER_MAP = {
+ "hadoop": {
+ # "provider_code": ('hadoop.cloud.providers.provider_code',
'ProviderHadoopService')
+ },
+ "zookeeper": {
+ # "provider_code": ('hadoop.cloud.providers.provider_code',
'ProviderZooKeeperService')
+ },
+}
+
+DEFAULT_SERVICE_PROVIDER_MAP = {
+ "hadoop": HadoopService,
+ "zookeeper": ZooKeeperService
+}
+
+def get_service(service, provider):
+ """
+ Retrieve the Service class for a service and provider.
+ """
+ try:
+ mod_name, service_classname = SERVICE_PROVIDER_MAP[service][provider]
+ _mod = __import__(mod_name, globals(), locals(), [service_classname])
+ return getattr(_mod, service_classname)
+ except KeyError:
+ return DEFAULT_SERVICE_PROVIDER_MAP[service]