Repository: helix Updated Branches: refs/heads/master 01222c4f6 -> 80a4a13fd
Refactoring cluster.py: Changing from module to class based functions. Abstracting Cluster object. Adding ZKCluster object. functions.py: Put all functions inside a class. Switched to using a class variable for host selection. zkfunctions.py: Adding support for direct zookeeper access. statemodeldefs.py: Constants for state model definitions. test/test_helix.py: Unit tests for both Rest object and zookeeper object. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a714f002 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a714f002 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a714f002 Branch: refs/heads/master Commit: a714f002cd2d9dd98bd86723da787c94f0d70ee7 Parents: 01222c4 Author: Casey Miller <[email protected]> Authored: Fri Feb 13 19:43:24 2015 -0800 Committer: Casey Miller <[email protected]> Committed: Fri Feb 13 19:43:24 2015 -0800 ---------------------------------------------------------------------- contributors/py-helix-admin/helix/cluster.py | 91 +- contributors/py-helix-admin/helix/functions.py | 925 +++++++++---------- .../py-helix-admin/helix/statemodeldefs.py | 39 + .../py-helix-admin/helix/test/test_helix.py | 60 ++ .../py-helix-admin/helix/zkfunctions.py | 522 +++++++++++ 5 files changed, 1111 insertions(+), 526 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/cluster.py ---------------------------------------------------------------------- diff --git a/contributors/py-helix-admin/helix/cluster.py b/contributors/py-helix-admin/helix/cluster.py index 7db7014..175bff8 100644 --- a/contributors/py-helix-admin/helix/cluster.py +++ b/contributors/py-helix-admin/helix/cluster.py @@ -23,17 +23,21 @@ from partition import Partition from resourcegroup import ResourceGroup from helixexceptions import HelixException -import functions +from functions import RestHelixFunctions +try: + from zkfunctions import ZookeeperHelixFunctions + zookeeper_ok = True +except ImportError: + zookeeper_ok = False -class Cluster(object): +class BaseCluster(object): """Basic model of a cluster, holds participants, partitions, slices, external view, ideal state, resource groups""" ver = (1, 0) - def __init__(self, host, cluster): - super(Cluster, self).__init__() - self.host = host + def __init__(self, cluster): + super(BaseCluster, self).__init__() self.cluster = cluster # dynamically loaded data below @@ -48,15 +52,13 @@ class Cluster(object): self.cluster) def __repr__(self): - return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster, - self.host) + return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster) def load_resources(self): """queries helix for resource groups and loades them into model""" try: - for cur_resource in functions.get_resource_groups(self.host, - self.cluster): - data = functions.get_resource_group(self.host, self.cluster, + for cur_resource in self.functions.get_resource_groups(self.cluster): + data = self.functions.get_resource_group(self.cluster, cur_resource) name = data["id"] count = data["simpleFields"]["NUM_PARTITIONS"] @@ -94,7 +96,7 @@ class Cluster(object): def _cluster_exists(self): """verify cluster exists in helix""" - if self.cluster in functions.get_clusters(self.host): + if self.cluster in self.functions.get_clusters(): return True return False @@ -103,7 +105,7 @@ class Cluster(object): self._participants = {} try: - instances = functions.get_instances(self.host, self.cluster) + instances = self.functions.get_instances(self.cluster) for instance in instances: ident = instance["id"] enabled = instance["simpleFields"]["HELIX_ENABLED"] @@ -129,7 +131,7 @@ class Cluster(object): """query partitions from helix and load into model""" self._partitions = {} for resource in self.resources: - newstate = functions.get_ideal_state(self.host, self.cluster, + newstate = self.functions.get_ideal_state(self.cluster, resource) self._partitions[resource] = {} if newstate: @@ -152,7 +154,7 @@ class Cluster(object): self._ideal_state = {} for resource in self.resources: self._ideal_state[resource] = \ - functions.get_ideal_state(self.host, self.cluster, resource) + self.functions.get_ideal_state(self.cluster, resource) @property def ideal_state(self): @@ -171,7 +173,7 @@ class Cluster(object): self._external_view = {} for resource in self.resources: self._external_view[resource] = \ - functions.get_external_view(self.host, self.cluster, resource) + self.functions.get_external_view(self.cluster, resource) @property def external_view(self): @@ -187,18 +189,18 @@ class Cluster(object): def get_config(self, config): """ get requested config from helix""" - return functions.get_config(self.host, self.cluster, config) + return self.functions.get_config(self.cluster, config) def set_cluster_config(self, config): """ set given configs in helix""" - return functions.set_config(self.host, self.cluster, config) + return self.functions.set_config(self.cluster, config) def set_resource_config(self, config, resource): """ set given configs in helix""" rname = resource if isinstance(resource, ResourceGroup): rname = resource.name - return functions.set_config(self.host, self.cluster, config, + return self.functions.set_config(self.cluster, config, resource=rname) def set_participant_config(self, config, participant): @@ -206,34 +208,34 @@ class Cluster(object): if isinstance(participant, Participant): pname = participant.ident """ set given configs in helix""" - return functions.set_config(self.host, self.cluster, config, + return self.functions.set_config(self.cluster, config, participant=pname) def activate_cluster(self, grand, enabled=True): """activate this cluster with the specified grand cluster""" - return functions.activate_cluster(self.host, self.cluster, grand, + return self.functions.activate_cluster(self.cluster, grand, enabled) def deactivate_cluster(self, grand): """deactivate this cluster against the given grandcluster""" - return functions.deactivate_cluster(self.host, self.cluster, grand) + return self.functions.deactivate_cluster(self.cluster, grand) def add_cluster(self): """add cluster to helix""" - return functions.add_cluster(self.host, self.cluster) + return self.functions.add_cluster(self.cluster) def add_instance(self, instances, port): """add instance to cluster""" - return functions.add_instance(self.host, self.cluster, instances, port) + return self.functions.add_instance(self.cluster, instances, port) def rebalance(self, resource, replicas, key=""): """rebalance a resource group""" - return functions.rebalance(self.host, self.cluster, resource, + return self.functions.rebalance(self.cluster, resource, replicas, key) def add_resource(self, resource, partitions, state_model_def, mode=""): """add resource to cluster""" - return functions.add_resource(self.host, self.cluster, resource, + return self.functions.add_resource(self.cluster, resource, partitions, state_model_def, mode) def enable_instance(self, instance, enabled=True): @@ -245,7 +247,7 @@ class Cluster(object): ident = instance else: raise HelixException("Instance must be a string or participant") - return functions.enable_instance(self.host, self.cluster, ident, + return self.functions.enable_instance(self.cluster, ident, enabled) def disable_instance(self, instance): @@ -272,7 +274,7 @@ class Cluster(object): else: raise HelixException("Partition must be a string or partition") - return functions.enable_partition(self.host, self.cluster, resource, + return self.functions.enable_partition(self.cluster, resource, part_id, ident, enabled) def disable_partition(self, resource, partition, instance): @@ -291,7 +293,7 @@ class Cluster(object): raise HelixException( "Resource must be a string or a resource group object") - return functions.enable_resource(self.host, self.cluster, + return self.functions.enable_resource(self.cluster, resource_name, enabled) def disable_resource(self, resource): @@ -308,7 +310,7 @@ class Cluster(object): else: raise HelixException("Resource must be resource object or string") - return functions.add_resource_tag(self.host, self.cluster, + return self.functions.add_resource_tag(self.cluster, resource_name, tag) # del resource not yet available in api @@ -322,7 +324,7 @@ class Cluster(object): # else: # raise HelixException("Resource must be resource object or str") # - # return functions.del_resource_tag(self.host, self.cluster, + # return self.functions.del_resource_tag(self.cluster, # resource_name, tag) def add_instance_tag(self, instance, tag): @@ -335,7 +337,7 @@ class Cluster(object): else: raise HelixException("Instance must be a string or participant") - return functions.add_instance_tag(self.host, self.cluster, ident, tag) + return self.functions.add_instance_tag(self.cluster, ident, tag) def del_instance_tag(self, instance, tag): ident = None @@ -347,18 +349,37 @@ class Cluster(object): else: raise HelixException("Instance must be a string or participant") - return functions.del_instance_tag(self.host, self.cluster, ident, tag) + return self.functions.del_instance_tag(self.cluster, ident, tag) def del_instance(self, instance): """remove instance from cluster, assumes instance is a participant object""" - return functions.del_instance(self.host, self.cluster, instance.ident) + return self.functions.del_instance(self.cluster, instance.ident) def del_resource(self, resource): """remove resource group from cluster, assumes resource is a resource object""" - return functions.del_resource(self.host, self.cluster, resource.name) + return self.functions.del_resource(self.cluster, resource.name) def del_cluster(self): """remove cluster from helix""" - return functions.del_cluster(self.host, self.cluster) + return self.functions.del_cluster(self.cluster) + +class Cluster(BaseCluster): + def __init__(self, host, cluster): + super(Cluster, self).__init__(cluster) + self.host = host + self.functions = RestHelixFunctions(host) + + +class ZKCluster(BaseCluster): + def __init__(self, zookeeper_connect_string, zookeeper_root, cluster): + super(ZKCluster, self).__init__(cluster) + + # We want to fail if kazoo cannot be found, but only if using the zookeeper object. + if not zookeeper_ok: + raise ImportError + + self.zookeeper_connect_string = zookeeper_connect_string + self.zookeeper_root = zookeeper_root + self.functions = ZookeeperHelixFunctions(self.zookeeper_connect_string, self.zookeeper_root) http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/functions.py ---------------------------------------------------------------------- diff --git a/contributors/py-helix-admin/helix/functions.py b/contributors/py-helix-admin/helix/functions.py index 5e36404..9221aca 100644 --- a/contributors/py-helix-admin/helix/functions.py +++ b/contributors/py-helix-admin/helix/functions.py @@ -25,538 +25,481 @@ from helixexceptions import HelixException from helixexceptions import HelixAlreadyExistsException from helixexceptions import HelixDoesNotExistException - -def _post_payload(host, path, data, **kwargs): - """generic function to handle posting data - :rtype : return body of page - :param host: host to send data to - :param path: path to interact with - :param data: data to send - :param kwargs: additional keyword args - """ - - if "http://" not in host: - host = "http://{0}".format(host) - - res = Resource(host) - - payload = "jsonParameters={0}".format(json.dumps(data)) - for key, value in kwargs.items(): - payload += '&{0}={1}'.format(key, json.dumps(value)) - headers = {"Content-Type": "application/json"} - # print "path is %s" % path - page = res.post(path=path, payload=payload, headers=headers) - body = page.body_string() - if body: - body = json.loads(body) +class RestHelixFunctions: + def __init__(self, host): + if "http://" not in host: + self.host = "http://{0}".format(host) + else: + self.host = host + + def _post_payload(self, path, data, **kwargs): + """generic function to handle posting data + :rtype : return body of page + :param path: path to interact with + :param data: data to send + :param kwargs: additional keyword args + """ + + res = Resource(self.host) + + payload = "jsonParameters={0}".format(json.dumps(data)) + for key, value in kwargs.items(): + payload += '&{0}={1}'.format(key, json.dumps(value)) + headers = {"Content-Type": "application/json"} + # print "path is %s" % path + page = res.post(path=path, payload=payload, headers=headers) + body = page.body_string() + if body: + body = json.loads(body) + + if isinstance(body, dict) and "ERROR" in body: + raise HelixException(body["ERROR"]) + + # test what was returned, see if any exceptions need to be raise + # if not body: + # raise HelixException("body for path {0} is empty".format(path)) + # else: + # print "BODY IS EMPTY FOR ", path + # print "BODY is %s." % body + + return body + + + def _get_page(self, path): + """if we're specifying a cluster then verify that a cluster is set""" + + res = Resource(self.host) + + page = res.get(path=path) + data = page.body_string() + body = None + try: + body = json.loads(data) + except ValueError: + body = json.loads(data[:-3]) + + # test what was returned, see if any exceptions need to be raise + if not body: + raise HelixException("body for path {0} is empty".format(path)) if isinstance(body, dict) and "ERROR" in body: raise HelixException(body["ERROR"]) - # test what was returned, see if any exceptions need to be raise - # if not body: - # raise HelixException("body for path {0} is empty".format(path)) - # else: - # print "BODY IS EMPTY FOR ", path - # print "BODY is %s." % body - - return body - - -def _get_page(host, path): - """if we're specifying a cluster then verify that a cluster is set""" - - if "http://" not in host: - host = "http://{0}".format(host) - - res = Resource(host) - - page = res.get(path=path) - data = page.body_string() - body = None - try: - body = json.loads(data) - except ValueError: - body = json.loads(data[:-3]) - - # test what was returned, see if any exceptions need to be raise - if not body: - raise HelixException("body for path {0} is empty".format(path)) - - if isinstance(body, dict) and "ERROR" in body: - raise HelixException(body["ERROR"]) - - return body - + return body -def _delete_page(host, path): - """delete page at a given path""" - retval = None - if "http://" not in host: - host = "http://{0}".format(host) - res = Resource(host) + def _delete_page(self, path): + """delete page at a given path""" + retval = None - page = res.delete(path) - data = page.body_string() - if data: - retval = json.loads(data) + res = Resource(self.host) - return retval + page = res.delete(path) + data = page.body_string() + if data: + retval = json.loads(data) + return retval -def get_clusters(host): - """ querys helix cluster for all clusters """ - return _get_page(host, "/clusters")["listFields"]["clusters"] + def get_clusters(self): + """ querys helix cluster for all clusters """ + return self._get_page("/clusters")["listFields"]["clusters"] -def get_resource_groups(host, cluster): - """ querys helix cluster for resources groups of the current cluster""" - return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[ - "listFields"]["ResourceGroups"] + def get_resource_groups(self, cluster): + """ querys helix cluster for resources groups of the current cluster""" + return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[ + "listFields"]["ResourceGroups"] -def get_resource_tags(host, cluster): - """returns a dict of resource tags for a cluster""" - return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[ - "mapFields"]["ResourceTags"] + def get_resource_tags(self, cluster): + """returns a dict of resource tags for a cluster""" + return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[ + "mapFields"]["ResourceTags"] -def get_resource_group(host, cluster, resource): - """ gets the ideal state of the specified resource group of the - current cluster""" - if resource not in get_resource_groups(host, cluster): - raise HelixException( - "{0} is not a resource group of {1}".format(resource, cluster)) - return _get_page(host, "/clusters/{0}/resourceGroups/{1}".format(cluster, - resource)) + def get_resource_group(self, cluster, resource): + """ gets the ideal state of the specified resource group of the + current cluster""" + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) + return self._get_page("/clusters/{0}/resourceGroups/{1}".format(cluster, + resource)) -def get_ideal_state(host, cluster, resource): - """ gets the ideal state of the specified resource group of the - current cluster""" + def get_ideal_state(self, cluster, resource): + """ gets the ideal state of the specified resource group of the + current cluster""" - if resource not in get_resource_groups(host, cluster): - raise HelixException( - "{0} is not a resource group of {1}".format(resource, cluster)) + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) - return _get_page(host, "/clusters/{0}/resourceGroups/{1}/idealState". - format(cluster, resource))["mapFields"] + return self._get_page("/clusters/{0}/resourceGroups/{1}/idealState". + format(cluster, resource))["mapFields"] + def get_external_view(self, cluster, resource): + """return the external view for a given cluster and resource""" + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) -def get_external_view(host, cluster, resource): - """return the external view for a given cluster and resource""" - if resource not in get_resource_groups(host, cluster): - raise HelixException( - "{0} is not a resource group of {1}".format(resource, cluster)) + return self._get_page("/clusters/{0}/resourceGroups/{1}/externalView".format( + cluster, resource))["mapFields"] - return _get_page(host, - "/clusters/{0}/resourceGroups/{1}/externalView".format( - cluster, resource))["mapFields"] + def get_instances(self, cluster): + """get list of instances registered to the cluster""" + if not cluster: + raise HelixException("Cluster must be set before " + "calling this function") + return self._get_page("/clusters/{0}/instances".format(cluster))[ + "instanceInfo"] -def get_instances(host, cluster): - """get list of instances registered to the cluster""" - if not cluster: - raise HelixException("Cluster must be set before " - "calling this function") + def get_instance_detail(self, cluster, name): + """get details of an instance""" + return self._get_page("/clusters/{0}/instances/{1}".format(cluster, name)) - return _get_page(host, "/clusters/{0}/instances".format(cluster))[ - "instanceInfo"] + def get_config(self, cluster, config): + """get requested config""" + return self._get_page("/clusters/{0}/configs/{1}".format(cluster, config)) + def add_cluster(self, cluster): + """add a cluster to helix""" + if cluster in self.get_clusters(): + raise HelixAlreadyExistsException( + "Cluster {0} already exists".format(cluster)) -def get_instance_detail(host, cluster, name): - """get details of an instance""" - return _get_page(host, "/clusters/{0}/instances/{1}".format(cluster, name)) + data = {"command": "addCluster", + "clusterName": cluster} - -def get_config(host, cluster, config): - """get requested config""" - return _get_page(host, "/clusters/{0}/configs/{1}".format(cluster, config)) - - -def add_cluster(host, cluster): - """add a cluster to helix""" - if cluster in get_clusters(host): - raise HelixAlreadyExistsException( - "Cluster {0} already exists".format(cluster)) - - data = {"command": "addCluster", - "clusterName": cluster} - - page = _post_payload(host, "/clusters", data) - return page - - -def add_instance(host, cluster, instances, port): - """add a list of instances to a cluster""" - if cluster not in get_clusters(host): - raise HelixDoesNotExistException( - "Cluster {0} does not exist".format(cluster)) - - if not isinstance(instances, list): - instances = [instances] - instances = ["{0}:{1}".format(instance, port) for instance in instances] - try: - newinstances = set(instances) - oldinstances = set( - [x["id"].replace('_', ':') for x in get_instances(host, cluster)]) - instances = list(newinstances - oldinstances) - except HelixException: - # this will get thrown if instances is empty, - # which if we're just populating should happen - pass - - if instances: - data = {"command": "addInstance", - "instanceNames": ";".join(instances)} - - instance_path = "/clusters/{0}/instances".format(cluster) - # print "adding to", instance_path - page = _post_payload(host, instance_path, data) + page = self._post_payload("/clusters", data) return page - else: - raise HelixAlreadyExistsException( - "All instances given already exist in cluster") - - -def rebalance(host, cluster, resource, replicas, key=""): - """rebalance the given resource group""" - if resource not in get_resource_groups(host, cluster): - raise HelixException( - "{0} is not a resource group of {1}".format(resource, cluster)) - - data = {"command": "rebalance", - "replicas": replicas} - - if key: - data["key"] = key - page = _post_payload(host, - "/clusters/{0}/resourceGroups/{1}/idealState".format( - cluster, resource), data) - return page - - -def activate_cluster(host, cluster, grand_cluster, enabled=True): - """activate the cluster with the grand cluster""" - if grand_cluster not in get_clusters(host): - raise HelixException( - "grand cluster {0} does not exist".format(grand_cluster)) - - data = {'command': 'activateCluster', - 'grandCluster': grand_cluster} - - if enabled: - data["enabled"] = "true" - else: - data["enabled"] = "false" - - page = _post_payload(host, "/clusters/{0}".format(cluster), data) - return page - - -def deactivate_cluster(host, cluster, grand_cluster): - """deactivate the cluster with the grand cluster""" - return activate_cluster(host, cluster, grand_cluster, enabled=False) - - -def add_resource(host, cluster, resource, partitions, - state_model_def, mode=""): - """Add given resource group""" - if resource in get_resource_groups(host, cluster): - raise HelixAlreadyExistsException( - "ResourceGroup {0} already exists".format(resource)) - - data = {"command": "addResource", - "resourceGroupName": resource, - "partitions": partitions, - "stateModelDefRef": state_model_def} - - if mode: - data["mode"] = mode - - return _post_payload(host, "/clusters/{0}/resourceGroups".format(cluster), - data) - - -def enable_resource(host, cluster, resource, enabled=True): - """enable or disable specified resource""" - data = {"command": "enableResource"} - if enabled: - data["enabled"] = "true" - else: - data["enabled"] = "false" - - return _post_payload(host, "/clusters/{0}/resourceGroups/{1}".format( - cluster, resource), data) - - -def disable_resource(host, cluster, resource): - """function for disabling resources""" - return enable_resource(host, cluster, resource, enabled=False) - - -def alter_ideal_state(host, cluster, resource, newstate): - """alter ideal state""" - data = {"command": "alterIdealState"} - return _post_payload(host, - "/clusters/{0}/resourceGroups/{1}/idealState".format( - cluster, resource), data, - newIdealState=newstate) - - -def enable_instance(host, cluster, instance, enabled=True): - """enable instance within cluster""" - data = {"command": "enableInstance"} - if enabled: - data["enabled"] = "true" - else: - data["enabled"] = "false" - - return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster, - instance), - data) - - -def disable_instance(host, cluster, instance): - """wrapper for ease of use for disabling an instance""" - return enable_instance(host, cluster, instance, enabled=False) - - -def swap_instance(host, cluster, old, new): - """swap instance""" - data = {"command": "swapInstance", - "oldInstance": old, - "newInstance": new} - - return _post_payload(host, "/cluster/{0}/instances".format(cluster), data) - - -def enable_partition(host, cluster, resource, partition, instance, - enabled=True): - """enable Partition """ - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) - - data = {"command": "enablePartition", - "resource": resource, - "partition": partition, - "enabled": enabled} - return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster, - instance), - data) - - -def disable_partition(host, cluster, resource, partitions, instance): - """disable Partition """ - return enable_partition(host, cluster, resource, partitions, instance, - enabled=False) - - -def reset_partition(host, cluster, resource, partitions, instance): - """reset partition""" - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) - - data = {"command": "resetPartition", - "resource": resource, - "partition": " ".join(partitions)} - return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster, - instance), - data) - - -def reset_resource(host, cluster, resource): - """reset resource""" - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) - - data = {"command": "resetResource"} - return _post_payload(host, - "/clusters/{0}/resourceGroups/{1}".format(cluster, - resource), - data) - - -def reset_instance(host, cluster, instance): - """reset instance""" - if instance not in get_instances(host, cluster): - raise HelixDoesNotExistException( - "Instance {0} does not exist".format(instance)) - - data = {"command": "resetInstance"} - return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster, - instance), - data) - - -def add_instance_tag(host, cluster, instance, tag): - """add tag to an instance""" - data = {"command": "addInstanceTag", - "instanceGroupTag": tag} - return _post_payload(host, - "/clusters/{0}/instances/{1}".format( - cluster, instance), data) - - -def del_instance_tag(host, cluster, instance, tag): - """remove tag from instance""" - data = {"command": "removeInstanceTag", - "instanceGroupTag": tag} - return _post_payload(host, - "/clusters/{0}/instances/{1}".format( - cluster, instance), data) - - -def add_resource_tag(host, cluster, resource, tag): - """add tag to resource group""" - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) - - data = {"command": "addResourceProperty", - "INSTANCE_GROUP_TAG": tag} - return _post_payload(host, - "/clusters/{0}/resourceGroups/{1}/idealState".format( - cluster, resource), data) - - -""" -del resource currently does not exist in helix api -def del_resource_tag(host, cluster, resource, tag): - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) - - data = {"command": "removeResourceProperty", - "INSTANCE_GROUP_TAG": tag} - return _post_payload(host, - "/clusters/{0}/resourceGroups/{1}/idealState".format( - cluster, resource), data) -""" - - -def get_instance_taginfo(host, cluster): - return _get_page(host, "/clusters/{0}/instances".format( - cluster))["tagInfo"] - - -def expand_cluster(host, cluster): - """expand cluster""" - data = {"command": "expandCluster"} - - return _post_payload(host, "/clusters/{0}/".format(cluster), data) - - -def expand_resource(host, cluster, resource): - """expand resource""" - data = {"command": "expandResource"} - - return _post_payload(host, - "/clusters/{0}/resourceGroup/{1}/idealState".format( - cluster, resource), data) - - -def add_resource_property(host, cluster, resource, properties): - """add resource property properties must be a dictionary of properties""" - properties["command"] = "addResourceProperty" - - return _post_payload(host, - "/clusters/{0}/resourceGroup/{1}/idealState".format( - cluster, resource), properties) - - -def _handle_config(host, cluster, configs, command, participant=None, - resource=None): - """helper function to set or delete configs in helix""" - data = {"command": "{0}Config".format(command), - "configs": ",".join( - ["{0}={1}".format(x, y) for x, y in configs.items()])} - - address = "/clusters/{0}/configs/".format(cluster) - if participant: - address += "participant/{0}".format(participant) - elif resource: - address += "resource/{0}".format(resource) - else: - address += "cluster" - - return _post_payload(host, address, data) - - -def set_config(host, cluster, configs, participant=None, resource=None): - """sets config in helix""" - return _handle_config(host, cluster, configs, "set", participant, resource) - - -def remove_config(host, cluster, configs, participant=None, resource=None): - """sets config in helix""" - return _handle_config(host, "remove", cluster, configs, participant, - resource) - - -def get_zk_path(host, path): - """get zookeeper path""" - return _get_page(host, "zkPath/{0}".format(path)) - - -def del_zk_path(host, path): - """delete zookeeper path""" - return _delete_page(host, "zkPath/{0}".format(path)) - - -def get_zk_child(host, path): - """get zookeeper child""" - return _get_page(host, "zkChild/{0}".format(path)) - - -def del_zk_child(host, path): - """delete zookeeper child""" - return _delete_page(host, "zkChild/{0}".format(path)) - - -def add_state_model(host, cluster, newstate): - """add state model""" - data = {"command": "addStateModel"} + def add_instance(self, cluster, instances, port): + """add a list of instances to a cluster""" + if cluster not in self.get_clusters(): + raise HelixDoesNotExistException( + "Cluster {0} does not exist".format(cluster)) + + if not isinstance(instances, list): + instances = [instances] + instances = ["{0}:{1}".format(instance, port) for instance in instances] + try: + newinstances = set(instances) + oldinstances = set( + [x["id"].replace('_', ':') for x in self.get_instances(cluster)]) + instances = list(newinstances - oldinstances) + except HelixException: + # this will get thrown if instances is empty, + # which if we're just populating should happen + pass + + if instances: + data = {"command": "addInstance", + "instanceNames": ";".join(instances)} + + instance_path = "/clusters/{0}/instances".format(cluster) + # print "adding to", instance_path + page = self._post_payload(instance_path, data) + return page + + else: + raise HelixAlreadyExistsException( + "All instances given already exist in cluster") + + def rebalance(self, cluster, resource, replicas, key=""): + """rebalance the given resource group""" + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) + + data = {"command": "rebalance", + "replicas": replicas} + + if key: + data["key"] = key + page = self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format( + cluster, resource), data) + return page - return _post_payload(host, "/clusters/{0}/StateModelDefs".format(cluster), - data, newStateModelDef=newstate) + def activate_cluster(self, cluster, grand_cluster, enabled=True): + """activate the cluster with the grand cluster""" + if grand_cluster not in self.get_clusters(): + raise HelixException( + "grand cluster {0} does not exist".format(grand_cluster)) + data = {'command': 'activateCluster', + 'grandCluster': grand_cluster} -def del_instance(host, cluster, instance): - """delete instance""" - if instance not in [x["id"] for x in get_instances(host, cluster)]: - raise HelixDoesNotExistException( - "Instance {0} does not exist.".format(instance)) + if enabled: + data["enabled"] = "true" + else: + data["enabled"] = "false" - page = _delete_page(host, - "/clusters/{0}/instances/{1}".format(cluster, - instance)) - return page + page = self._post_payload("/clusters/{0}".format(cluster), data) + return page + def deactivate_cluster(self, cluster, grand_cluster): + """deactivate the cluster with the grand cluster""" + return activate_cluster(cluster, grand_cluster, enabled=False) + + def add_resource(self, cluster, resource, partitions, state_model_def, mode=""): + """Add given resource group""" + if resource in self.get_resource_groups(cluster): + raise HelixAlreadyExistsException( + "ResourceGroup {0} already exists".format(resource)) + + data = {"command": "addResource", + "resourceGroupName": resource, + "partitions": partitions, + "stateModelDefRef": state_model_def} + + if mode: + data["mode"] = mode + + return self._post_payload("/clusters/{0}/resourceGroups".format(cluster), + data) + + def enable_resource(self, cluster, resource, enabled=True): + """enable or disable specified resource""" + data = {"command": "enableResource"} + if enabled: + data["enabled"] = "true" + else: + data["enabled"] = "false" + + return self._post_payload("/clusters/{0}/resourceGroups/{1}".format( + cluster, resource), data) + + def disable_resource(self, cluster, resource): + """function for disabling resources""" + return enable_resource(cluster, resource, enabled=False) + + def alter_ideal_state(self, cluster, resource, newstate): + """alter ideal state""" + data = {"command": "alterIdealState"} + return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format( + cluster, resource), data, + newIdealState=newstate) + + def enable_instance(self, cluster, instance, enabled=True): + """enable instance within cluster""" + data = {"command": "enableInstance"} + if enabled: + data["enabled"] = "true" + else: + data["enabled"] = "false" + + return self._post_payload("/clusters/{0}/instances/{1}".format(cluster, + instance), + data) + + def disable_instance(self, cluster, instance): + """wrapper for ease of use for disabling an instance""" + return enable_instance(cluster, instance, enabled=False) + + def swap_instance(self, cluster, old, new): + """swap instance""" + data = {"command": "swapInstance", + "oldInstance": old, + "newInstance": new} + + return self._post_payload("/cluster/{0}/instances".format(cluster), data) + + def enable_partition(self, cluster, resource, partition, instance, + enabled=True): + """enable Partition """ + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + data = {"command": "enablePartition", + "resource": resource, + "partition": partition, + "enabled": enabled} + return self._post_payload("/clusters/{0}/instances/{1}".format(cluster, + instance), + data) + + def disable_partition(self, cluster, resource, partitions, instance): + """disable Partition """ + return enable_partition(cluster, resource, partitions, instance, + enabled=False) + + def reset_partition(self, cluster, resource, partitions, instance): + """reset partition""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + data = {"command": "resetPartition", + "resource": resource, + "partition": " ".join(partitions)} + return self._post_payload("/clusters/{0}/instances/{1}".format(cluster, + instance), + data) + + def reset_resource(self, cluster, resource): + """reset resource""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + data = {"command": "resetResource"} + return self._post_payload("/clusters/{0}/resourceGroups/{1}".format(cluster, + resource), + data) + + def reset_instance(self, cluster, instance): + """reset instance""" + if instance not in self.get_instances(cluster): + raise HelixDoesNotExistException( + "Instance {0} does not exist".format(instance)) + + data = {"command": "resetInstance"} + return self._post_payload("/clusters/{0}/instances/{1}".format(cluster, + instance), + data) + + def add_instance_tag(self, cluster, instance, tag): + """add tag to an instance""" + data = {"command": "addInstanceTag", + "instanceGroupTag": tag} + return self._post_payload("/clusters/{0}/instances/{1}".format( + cluster, instance), data) + + def del_instance_tag(self, cluster, instance, tag): + """remove tag from instance""" + data = {"command": "removeInstanceTag", + "instanceGroupTag": tag} + return self._post_payload("/clusters/{0}/instances/{1}".format( + cluster, instance), data) + + def add_resource_tag(self, cluster, resource, tag): + """add tag to resource group""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + data = {"command": "addResourceProperty", + "INSTANCE_GROUP_TAG": tag} + return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format( + cluster, resource), data) -def del_resource(host, cluster, resource): - """delete specified resource from cluster""" - if resource not in get_resource_groups(host, cluster): - raise HelixDoesNotExistException( - "ResourceGroup {0} does not exist".format(resource)) + """ + del resource currently does not exist in helix api + def del_resource_tag(self, cluster, resource, tag): + if resource not in self.get_resource_groups(host, cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + data = {"command": "removeResourceProperty", + "INSTANCE_GROUP_TAG": tag} + return _post_payload(host, + "/clusters/{0}/resourceGroups/{1}/idealState".format( + cluster, resource), data) + """ - page = _delete_page(host, "/clusters/{0}/resourceGroups/{1}".format( - cluster, resource)) - return page + def get_instance_taginfo(self, cluster): + return self._get_page("/clusters/{0}/instances".format( + cluster))["tagInfo"] + + def expand_cluster(self, cluster): + """expand cluster""" + data = {"command": "expandCluster"} + return self._post_payload("/clusters/{0}/".format(cluster), data) + + def expand_resource(self, cluster, resource): + """expand resource""" + data = {"command": "expandResource"} + + return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format( + cluster, resource), data) + + def add_resource_property(self, cluster, resource, properties): + """add resource property properties must be a dictionary of properties""" + properties["command"] = "addResourceProperty" + + return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format( + cluster, resource), properties) + + def _handle_config(self, cluster, configs, command, participant=None, + resource=None): + """helper function to set or delete configs in helix""" + data = {"command": "{0}Config".format(command), + "configs": ",".join( + ["{0}={1}".format(x, y) for x, y in configs.items()])} + + address = "/clusters/{0}/configs/".format(cluster) + if participant: + address += "participant/{0}".format(participant) + elif resource: + address += "resource/{0}".format(resource) + else: + address += "cluster" + + return self._post_payload(address, data) + + def set_config(self, cluster, configs, participant=None, resource=None): + """sets config in helix""" + return self._handle_config(cluster, configs, "set", participant, resource) + + def remove_config(self, cluster, configs, participant=None, resource=None): + """sets config in helix""" + return self._handle_config(host, "remove", cluster, configs, participant, + resource) + + def get_zk_path(self, path): + """get zookeeper path""" + return self._get_page("zkPath/{0}".format(path)) + + def del_zk_path(self, path): + """delete zookeeper path""" + return self._delete_page("zkPath/{0}".format(path)) + + def get_zk_child(self, path): + """get zookeeper child""" + return self._get_page("zkChild/{0}".format(path)) + + def del_zk_child(self, path): + """delete zookeeper child""" + return self._delete_page("zkChild/{0}".format(path)) + + def add_state_model(self, cluster, newstate): + """add state model""" + data = {"command": "addStateModel"} + + return self._post_payload("/clusters/{0}/StateModelDefs".format(cluster), + data, newStateModelDef=newstate) + + def del_instance(self, cluster, instance): + """delete instance""" + if instance not in [x["id"] for x in self.get_instances(cluster)]: + raise HelixDoesNotExistException( + "Instance {0} does not exist.".format(instance)) + + page = self._delete_page("/clusters/{0}/instances/{1}".format(cluster, + instance)) + return page + def del_resource(self, cluster, resource): + """delete specified resource from cluster""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) -def del_cluster(host, cluster): - """delete cluster""" - page = _delete_page(host, "/clusters/{0}".format(cluster)) + page = self._delete_page("/clusters/{0}/resourceGroups/{1}".format( + cluster, resource)) + return page - return page + def del_cluster(self, cluster): + """delete cluster""" + page = self._delete_page("/clusters/{0}".format(cluster)) + return page -def send_message(host, cluster, path, **kwargs): - pass + def send_message(self, cluster, path, **kwargs): + pass http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/statemodeldefs.py ---------------------------------------------------------------------- diff --git a/contributors/py-helix-admin/helix/statemodeldefs.py b/contributors/py-helix-admin/helix/statemodeldefs.py new file mode 100644 index 0000000..8446ae9 --- /dev/null +++ b/contributors/py-helix-admin/helix/statemodeldefs.py @@ -0,0 +1,39 @@ +from ordereddict import OrderedDict + +# These essentially come from the java classes defined here. It is cheesey and should probably come from a configuration file. +# https://github.com/linkedin/helix/blob/master/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java + +LEADER_STANDBY_STATE_DEF = OrderedDict() +LEADER_STANDBY_STATE_DEF["id"] = "LeaderStandby" +MAP_FIELDS = OrderedDict() +LEADER_STANDBY_STATE_DEF["mapFields"] = MAP_FIELDS +MAP_FIELDS["DROPPED.meta"] = { "count" : "-1" } +MAP_FIELDS["LEADER.meta"] = { "count" : "1" } +LEADER_NEXT = OrderedDict() +MAP_FIELDS["LEADER.next"] = LEADER_NEXT +LEADER_NEXT["DROPPED"] = "STANDBY" +LEADER_NEXT["STANDBY"] = "STANDBY" +LEADER_NEXT["OFFLINE"] = "STANDBY" +MAP_FIELDS["OFFLINE.meta"] = { "count" : "-1" } +OFFLINE_NEXT = OrderedDict() +MAP_FIELDS["OFFLINE.next"] = OFFLINE_NEXT +OFFLINE_NEXT["LEADER"] = "STANDBY" +OFFLINE_NEXT["DROPPED"] = "DROPPED" +OFFLINE_NEXT["STANDBY"] = "STANDBY" +MAP_FIELDS["STANDBY.meta"] = { "count" : "R" } +STANDBY_NEXT = OrderedDict() +MAP_FIELDS["STANDBY.next"] = STANDBY_NEXT +STANDBY_NEXT["LEADER"] = "LEADER" +STANDBY_NEXT["DROPPED"] = "OFFLINE" +STANDBY_NEXT["OFFLINE"] = "OFFLINE" +LIST_FIELDS = OrderedDict() +LEADER_STANDBY_STATE_DEF["listFields"] = LIST_FIELDS +LIST_FIELDS["STATE_PRIORITY_LIST"] = [ "LEADER", "STANDBY", "OFFLINE", "DROPPED" ] +LIST_FIELDS["STATE_TRANSITION_PRIORITYLIST"] = [ "LEADER-STANDBY", "STANDBY-LEADER", "OFFLINE-STANDBY", "STANDBY-OFFLINE", "OFFLINE-DROPPED" ] +SIMPLE_FIELDS = OrderedDict() +LEADER_STANDBY_STATE_DEF["simpleFields"] = SIMPLE_FIELDS +SIMPLE_FIELDS["INITIAL_STATE"] = "OFFLINE" + +STATE_DEF_MAP = { + "LeaderStandby": LEADER_STANDBY_STATE_DEF +} http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/test/test_helix.py ---------------------------------------------------------------------- diff --git a/contributors/py-helix-admin/helix/test/test_helix.py b/contributors/py-helix-admin/helix/test/test_helix.py new file mode 100755 index 0000000..f466086 --- /dev/null +++ b/contributors/py-helix-admin/helix/test/test_helix.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python2.6 + +from helixexceptions import HelixException +from helixexceptions import HelixAlreadyExistsException +from helixexceptions import HelixDoesNotExistException + +from participant import Participant +from partition import Partition +from resourcegroup import ResourceGroup + +from functions import RestHelixFunctions +from zkfunctions import ZookeeperHelixFunctions + +from cluster import ZKCluster +from cluster import Cluster + +import pytest +import random + +INSTANCE_ID = "fake_12345" +INSTANCE_NAME = "fake" +INSTANCE_PORT = 12345 +PARTITION_COUNT = 5 +REPLICA_COUNT = 1 +STATEMODELDEF = "LeaderStandby" +REBALANCE_MODE = "FULL_AUTO" +RESOURCE_NAME = "fake_resource" +TAG_NAME = "fake_tag" + +CLUSTER_ID = "helix_{id}" +ZOOKEEPER_ROOT = "/testing_helix" +ZOOKEEPER_HOST = "localhost:2181" +REST_HOST = "localhost:8100" + +class TestHelixAdmin(object): + #@pytest.mark.int + def test_zookeeper_cluster(self): + cluster = ZKCluster(ZOOKEEPER_HOST, ZOOKEEPER_ROOT, self._get_cluster_name()) + self._cluster_actions(cluster) + + #@pytest.mark.int + def test_rest_cluster(self): + cluster = Cluster(REST_HOST, self._get_cluster_name()) + self._cluster_actions(cluster) + + def _get_cluster_name(self): + return CLUSTER_ID.format(id=random.randint(1, 1000000)) + + def _cluster_actions(self, cluster): + cluster.add_cluster() + cluster.add_resource(RESOURCE_NAME, PARTITION_COUNT, STATEMODELDEF, mode=REBALANCE_MODE) + cluster.add_instance(INSTANCE_NAME, INSTANCE_PORT) + cluster.add_instance_tag(INSTANCE_ID, TAG_NAME) + cluster.add_resource_tag(RESOURCE_NAME, TAG_NAME) + cluster.rebalance(RESOURCE_NAME, REPLICA_COUNT) + participant = cluster.participants.get(INSTANCE_ID) + cluster.del_instance(participant) + resource = cluster.resources.get(RESOURCE_NAME) + cluster.del_resource(resource) + cluster.del_cluster() http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/zkfunctions.py ---------------------------------------------------------------------- diff --git a/contributors/py-helix-admin/helix/zkfunctions.py b/contributors/py-helix-admin/helix/zkfunctions.py new file mode 100644 index 0000000..aed2b74 --- /dev/null +++ b/contributors/py-helix-admin/helix/zkfunctions.py @@ -0,0 +1,522 @@ +"""library to handle helix commands""" +# XXX: Explore using zookeeper transactions for some of the write operations into zookeeper. +# Currently, it looks like ensure_path and create with the make_path argument are not supported in transactions so it isn't usable out of the box. +import json +from ordereddict import OrderedDict +from kazoo.client import KazooClient + +from statemodeldefs import STATE_DEF_MAP +from helixexceptions import HelixException +from helixexceptions import HelixAlreadyExistsException +from helixexceptions import HelixDoesNotExistException +from kazoo.exceptions import NodeExistsError + +RESOURCE_MODES = ["FULL_AUTO", "CUSTOMIZED", "SEMI_AUTO", "USER_DEFINED"] + +IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES" +RESOURCE_IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES/{resourceName}" +EXTERNAL_VIEW_STATE_PATH = "/{clusterName}/EXTERNALVIEW/{resourceName}" +INSTANCE_PATH = "/{clusterName}/INSTANCES" +PARTICIPANT_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT/{instanceName}" +PARTICIPANTS_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT" +CLUSTER_CONFIG_PATH = "/{clusterName}/CONFIGS/CLUSTER/{clusterName}" +CONFIG_PATH = "/{clusterName}/CONFIGS/{configName}/{entityName}" +STATE_MODEL_DEF_PATH = "/{clusterName}/STATEMODELDEFS/{stateModelName}" +LIVE_INSTANCE_PATH = "/{clusterName}/LIVEINSTANCES/{instanceName}" + +HELIX_ZOOKEEPER_PATHS = { + "cluster": [ + "/{clusterName}/CONFIGS", + "/{clusterName}/CONFIGS/RESOURCE", + "/{clusterName}/CONFIGS/CLUSTER", + PARTICIPANTS_CONFIG_PATH, + "/{clusterName}/LIVEINSTANCES", + INSTANCE_PATH, + IDEAL_STATE_PATH, + #"/{clusterName}/RESOURCEASSIGNMENTS", + "/{clusterName}/EXTERNALVIEW", + "/{clusterName}/STATEMODELDEFS", + "/{clusterName}/CONTROLLER", + "/{clusterName}/CONTROLLER/HISTORY", + "/{clusterName}/CONTROLLER/ERRORS", + "/{clusterName}/CONTROLLER/MESSAGES", + "/{clusterName}/CONTROLLER/STATUSUPDATES", + "/{clusterName}/PROPERTYSTORE", + ], + "resource": [ + RESOURCE_IDEAL_STATE_PATH, + "/{clusterName}/RESOURCEASSIGNMENTS/{resourceName}", + EXTERNAL_VIEW_STATE_PATH + ], + "instance": [ + #"/{clusterName}/LIVEINSTANCES/{instanceName}", + "/{clusterName}/INSTANCES/{instanceName}", + "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES", + "/{clusterName}/INSTANCES/{instanceName}/ERRORS", + "/{clusterName}/INSTANCES/{instanceName}/STATUSUPDATES", + "/{clusterName}/INSTANCES/{instanceName}/MESSAGES" + ], + "statemodel": [ + STATE_MODEL_DEF_PATH + ] +} + +CLUSTER_CONFIG_TEMPLATE = OrderedDict() +CLUSTER_CONFIG_TEMPLATE["id"] = "{clusterName}" +CLUSTER_CONFIG_TEMPLATE["mapFields"] = {} +CLUSTER_CONFIG_TEMPLATE["listFields"] = {} +CLUSTER_CONFIG_TEMPLATE["simpleFields"] = {"allowParticipantAutoJoin": "true"} + + +class ZookeeperHelixFunctions(object): + """Zookeeper based client to manage helix clusters""" + def __init__(self, zookeeper_connect_string, zk_root): + """Constructor.""" + self.zk = KazooClient(hosts=zookeeper_connect_string) + self.zk.start() + self.zk_root = zk_root + + def _list_path(self, path): + """List a zookeeper path.""" + return self.zk.get_children(path) + + def _is_valid_cluster(self, cluster): + """Validate cluster configuration.""" + for path in HELIX_ZOOKEEPER_PATHS.get("cluster"): + full_path = self._build_path(path.format(clusterName=cluster)) + if not self.zk.exists(full_path): + return False + return True + + def _build_path(self, path): + """Construct zookeeper path.""" + return "".join([self.zk_root, path]) + + @classmethod + def _build_instance_entry(cls, instance, enabled="true"): + """Create the data entry for an instance.""" + host, port = instance.split(":") + instance_data = OrderedDict() + instance_data["id"] = "{host}_{port}".format(host=host, port=port) + instance_data["listFields"] = {} + instance_data["mapFields"] = {} + instance_data["simpleFields"] = OrderedDict() + instance_data["simpleFields"]["HELIX_ENABLED"] = enabled + instance_data["simpleFields"]["HELIX_HOST"] = host + instance_data["simpleFields"]["HELIX_PORT"] = port + return instance_data + + def create_root(self): + """Initialize zookeeper root""" + path = self._build_path("") + if not self.zk.exists(path): + self.zk.create(path) + return True + + def get_clusters(self): + """ querys helix cluster for all clusters """ + if self.zk.exists(self.zk_root): + return [ cluster for cluster in self._list_path(self.zk_root) if self._is_valid_cluster(cluster) ] + else: + return [] + + def get_resource_groups(self, cluster): + """ querys helix cluster for resources groups of the current cluster""" + return self._list_path(self._build_path(IDEAL_STATE_PATH.format(clusterName=cluster))) + + def get_resource_tags(self, cluster): + """returns a dict of resource tags for a cluster""" + resource_tags = {} + for resource in self.get_resource_groups(cluster): + resource_data, resource_meta = self._get_resource_group(cluster, resource) + tag = resource_data.get("INSTANCE_GROUP_TAG") + if tag: + resource_tags[tag] = [resource] + + return resource_tags + + def _get_resource_group(self, cluster, resource): + """ gets the ideal state of the specified resource group of the + current cluster""" + + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{resource} is not a resource group of {cluster}".format(resource=resource, cluster=cluster)) + + data, stat = self.zk.get(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource))) + return (json.loads(data), stat) + + def get_resource_group(self, cluster, resource): + """ COMPAT: gets the ideal state of the specified resource group of the + current cluster""" + + return self._get_resource_group(cluster, resource)[0] + + def _get_ideal_state(self, cluster, resource): + """ gets the ideal state of the specified resource group of the + current cluster""" + + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) + + return self._get_resource_group(cluster, resource)["mapFields"] + + def get_ideal_state(self, cluster, resource): + """ COMPAT: gets the ideal state of the specified resource group of the + current cluster""" + + return self._get_ideal_state(cluster, resource)[0] + + def _get_external_view(self, cluster, resource): + """return the external view for a given cluster and resource""" + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) + data, stat = self.zk.get(self._build_path(EXTERNAL_VIEW_STATE_PATH.format(clusterName=cluster, resourceName=resource))) + return (json.loads(data)["mapFields"], stat) + + def get_external_view(self, cluster, resource): + """ COMPAT: return the external view for a given cluster and resource""" + return self._get_external_view(cluster, resource)[0] + + def get_instances(self, cluster): + """get list of instances registered to the cluster""" + if not cluster: + raise HelixException("Cluster must be set before " + "calling this function") + + instances = [] + for instance in self._list_path(self._build_path(PARTICIPANTS_CONFIG_PATH.format(clusterName=cluster))): + instance_data = json.loads(self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)))[0]) + if self.zk.exists(self._build_path(LIVE_INSTANCE_PATH.format(clusterName=cluster, instanceName=instance))): + instance_data["simpleFields"]["Alive"] = "true" + else: + instance_data["simpleFields"]["Alive"] = "false" + instances.append(instance_data) + return instances + + def _get_instance_detail(self, cluster, instance): + """get details of an instance""" + data, stat = self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance))) + return (json.loads(data), stat) + + def get_instance_detail(self, cluster, instance): + """ COMPAT: get details of an instance""" + return self._get_instance_detail(cluster, instance)[0] + + def _get_config(self, cluster, config, entity): + """get requested config""" + data, stat = self.zk.get(self._build_path(CONFIG_PATH.format(clusterName=cluster, configName=config, entityName=entity))) + return (json.loads(data), stat) + + def get_config(self, cluster, config, entity): + """ COMPAT: get requested config""" + return self._get_config(cluster, config, entity)[0] + + def add_cluster(self, cluster): + """add a cluster to helix""" + if cluster in self.get_clusters(): + raise HelixAlreadyExistsException( + "Cluster {0} already exists".format(cluster)) + + for path in HELIX_ZOOKEEPER_PATHS.get("cluster"): + self.zk.ensure_path(self._build_path(path.format(clusterName=cluster))) + + data = CLUSTER_CONFIG_TEMPLATE + data["id"] = cluster + + try: + self.zk.create(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster)), json.dumps(data)) + except NodeExistsError: + # Ignore existing cluster + pass + + # Insert state defs if they don't exist + for state_def in STATE_DEF_MAP: + if not self.zk.exists(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def))): + self.zk.create(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def)), json.dumps(STATE_DEF_MAP[state_def])) + + return True + + def add_instance(self, cluster, instances, port): + """add a list of instances to a cluster""" + if cluster not in self.get_clusters(): + raise HelixDoesNotExistException( + "Cluster {0} does not exist".format(cluster)) + + if not isinstance(instances, list): + instances = [instances] + instances = ["{instance}:{port}".format(instance=instance, port=port) for instance in instances] + try: + newinstances = set(instances) + oldinstances = set( + [x["id"].replace('_', ':') for x in self.get_instances(cluster)]) + instances = list(newinstances - oldinstances) + except HelixException: + # this will get thrown if instances is empty, + # which if we're just populating should happen + pass + + if instances: + for instance in instances: + data = self._build_instance_entry(instance) + self.zk.create(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_'))), json.dumps(data)) + for path in HELIX_ZOOKEEPER_PATHS.get("instance"): + self.zk.ensure_path(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_')))) + return True + else: + raise HelixAlreadyExistsException( + "All instances given already exist in cluster") + + + def rebalance(self, cluster, resource, replicas, key=""): + """rebalance the given resource group""" + if resource not in self.get_resource_groups(cluster): + raise HelixException( + "{0} is not a resource group of {1}".format(resource, cluster)) + + # TODO: key usage is currently not supported. + if not key == "": + raise NotImplementedError + + resource_data, resource_meta = self._get_resource_group(cluster, resource) + resource_data["simpleFields"]["REPLICAS"] = replicas + self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data)) + + return True + + def activate_cluster(self, cluster, grand_cluster, enabled=True): + """activate the cluster with the grand cluster""" + if grand_cluster not in self.get_clusters(): + raise HelixException( + "grand cluster {0} does not exist".format(grand_cluster)) + + raise NotImplementedError + + def deactivate_cluster(self, cluster, grand_cluster): + """deactivate the cluster with the grand cluster""" + return self.activate_cluster(cluster, grand_cluster, enabled=False) + + + def add_resource(self, cluster, resource, partitions, + state_model_def, mode="", state_model_factory_name="DEFAULT"): + """Add given resource group""" + if resource in self.get_resource_groups(cluster): + raise HelixAlreadyExistsException( + "ResourceGroup {0} already exists".format(resource)) + + data = {"id": resource, + "mapFields": {}, + "listFields": {}, + "simpleFields": { + "IDEAL_STATE_MODE": "AUTO", + "NUM_PARTITIONS": partitions, + "REBALANCE_MODE": mode, + "REPLICAS": "0", + "STATE_MODEL_DEF_REF": state_model_def, + "STATE_MODEL_FACTORY_NAME": state_model_factory_name + } + } + + if mode: + if mode in RESOURCE_MODES: + data["mode"] = mode + else: + raise ValueError("Invalid mode ({mode})".format(mode=mode)) + + self.zk.create(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(data)) + return True + + def enable_resource(self, cluster, resource, enabled=True): + """enable or disable specified resource""" + raise NotImplementedError + + def disable_resource(self, cluster, resource): + """function for disabling resources""" + return self.enable_resource(cluster, resource, enabled=False) + + def alter_ideal_state(self, cluster, resource, newstate): + """alter ideal state""" + raise NotImplementedError + + def enable_instance(self, cluster, instance, enabled=True): + """enable instance within cluster""" + raise NotImplementedError + + def disable_instance(self, cluster, instance): + """wrapper for ease of use for disabling an instance""" + return self.enable_instance(cluster, instance, enabled=False) + + def swap_instance(self, cluster, old, new): + """swap instance""" + raise NotImplementedError + + def enable_partition(self, cluster, resource, partition, instance, + enabled=True): + """enable Partition """ + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + raise NotImplementedError + + def disable_partition(self, cluster, resource, partitions, instance): + """disable Partition """ + return self.enable_partition(cluster, resource, partitions, instance, + enabled=False) + + def reset_partition(self, cluster, resource, partitions, instance): + """reset partition""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + raise NotImplementedError + + def reset_resource(self, cluster, resource): + """reset resource""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + raise NotImplementedError + + def reset_instance(self, cluster, instance): + """reset instance""" + if instance not in self.get_instances(cluster): + raise HelixDoesNotExistException( + "Instance {0} does not exist".format(instance)) + + raise NotImplementedError + + def add_instance_tag(self, cluster, instance, tag): + """add tag to an instance""" + instance_data, instance_meta = self._get_instance_detail(cluster, instance) + instance_tags = instance_data.get("listFields").get("TAG_LIST", []) + if tag in instance_tags: + raise HelixAlreadyExistsException( + "Tag ({tag}) already exists for instance ({instance}).".format(tag=tag, instance=instance)) + + instance_tags.append(tag) + instance_data["listFields"]["TAG_LIST"] = instance_tags + + # XXX: Apply some retry logic here + self.zk.set(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)), json.dumps(instance_data), version=instance_meta.version) + return True + + def del_instance_tag(self, cluster, instance, tag): + """remove tag from instance""" + if instance not in [x["id"] for x in self.get_instances(cluster)]: + raise HelixDoesNotExistException( + "Instance {0} does not exist.".format(instance)) + + def add_resource_tag(self, cluster, resource, tag): + """add tag to resource group""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + resource_data, resource_stat = self._get_resource_group(cluster, resource) + resource_data["simpleFields"]["INSTANCE_GROUP_TAG"] = tag + + self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data), version=resource_stat.version) + return True + + def del_resource_tag(self, cluster, resource, tag): + """Delete resource tag.""" + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + raise NotImplementedError + + def get_instance_taginfo(self, cluster): + """Get resource tag info.""" + instance_tags = {} + for instance in self.get_instances(cluster): + list_fields = instance.get("listFields") + if "TAG_LIST" in list_fields: + for tag in list_fields.get("TAG_LIST"): + if tag in instance_tags: + instance_tags[tag].append(instance.get("id")) + else: + instance_tags[tag] = [instance.get("id")] + return instance_tags + + def expand_cluster(self, cluster): + """expand cluster""" + raise NotImplementedError + + def expand_resource(self, cluster, resource): + """expand resource""" + raise NotImplementedError + + def add_resource_property(self, cluster, resource, properties): + """Add resource property. Properties must be a dictionary of properties.""" + raise NotImplementedError + + def set_config(self, cluster, configs, participant=None, resource=None): + """sets config in helix""" + raise NotImplementedError + + def remove_config(self, cluster, configs, participant=None, resource=None): + """sets config in helix""" + raise NotImplementedError + + def get_zk_path(self, path): + """get zookeeper path""" + return self.zk.get(self._build_path(path)) + + def del_zk_path(self, path): + """delete zookeeper path""" + return self.zk.delete(self._build_path(path)) + + def add_state_model(self, cluster, newstate): + """add state model""" + raise NotImplementedError + + def del_instance(self, cluster, instance): + """delete instance""" + if cluster not in self.get_clusters(): + raise HelixDoesNotExistException( + "Cluster {0} does not exist.".format(cluster)) + + if instance not in [x["id"] for x in self.get_instances(cluster)]: + raise HelixDoesNotExistException( + "Instance {0} does not exist.".format(instance)) + + self.zk.delete(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_')))) + + # Reverse zookeeper structure for destruction. + for path in HELIX_ZOOKEEPER_PATHS.get("instance")[::-1]: + self.zk.delete(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_')))) + return True + + def del_resource(self, cluster, resource): + """delete specified resource from cluster""" + if cluster not in self.get_clusters(): + raise HelixDoesNotExistException( + "Cluster {0} does not exist.".format(cluster)) + + if resource not in self.get_resource_groups(cluster): + raise HelixDoesNotExistException( + "ResourceGroup {0} does not exist".format(resource)) + + self.zk.delete(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource))) + return True + + def del_cluster(self, cluster): + """delete cluster""" + if cluster not in self.get_clusters(): + raise HelixDoesNotExistException( + "Cluster {0} does not exist.".format(cluster)) + + self.zk.delete(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster))) + + for path in HELIX_ZOOKEEPER_PATHS.get("cluster")[::-1]: + self.zk.ensure_path(self._build_path(path.format(clusterName=cluster))) + + return True + + def send_message(self, cluster, path, **kwargs): + """Send helix IPC message.""" + raise NotImplementedError
