http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/topologycontext.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/topologycontext.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/topologycontext.py deleted file mode 100644 index b85d97b..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/topologycontext.py +++ /dev/null @@ -1,466 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import constants - - -class Topology: - """ - Represents the topology provided by the Cloud Controller - """ - - def __init__(self): - self.service_map = {} - """ :type : dict[str, Service] """ - self.initialized = False - """ :type : bool """ - self.json_str = None - """ :type : str """ - - def get_services(self): - """ - Provides the list of services on the topology - :return: The list of Service objects - :rtype: list[Service] - """ - return self.service_map.values() - - def get_service(self, service_name): - """ - Provides the service information for the given service name - :param str service_name: service name to be retrieved - :return: Service object of the service, None if the provided service name is invalid - :rtype: Service - """ - if service_name in self.service_map: - return self.service_map[service_name] - - return None - - def add_service(self, service): - """ - Adds a service to the list of services on the topology - - :param Service service: - :return: void - """ - self.service_map[service.service_name] = service - - def add_services(self, services): - """ - - :param list[Service] services: - :return: void - """ - for service in services: - self.add_service(service) - - def remove_service(self, service_name): - """ - Removes the service of the provided service name - :param str service_name: - :return: void - """ - if service_name in self.service_map: - self.service_map.pop(service_name) - - def service_exists(self, service_name): - """ - Checks if the service of the provided service name exists - :param str service_name: - :return: True if the service exists, False if otherwise - :rtype: bool - """ - return service_name in self.service_map - - def clear(self): - """ - Clears the service information list - :return: void - """ - self.service_map = {} - - def __str__(self): - """ - to string override - :return: - """ - return "Topology [serviceMap= %r , initialized= %r ]" % (self.service_map, self.initialized) - - -class Service: - """ - Represents a service on the topology - """ - - def __init__(self, service_name, service_type): - self.service_name = service_name - """ :type : str """ - self.service_type = service_type - """ :type : str """ - self.cluster_id_cluster_map = {} - """ :type : dict[str, Cluster] """ - self.port_map = {} - """ :type : dict[str, Port] """ - self.properties = {} - """ :type : dict[str, str] """ - - def get_clusters(self): - """ - Provides the list of clusters in the particular service - :return: The list of Cluster objects - :rtype: list[Cluster] - """ - return self.cluster_id_cluster_map.values() - - def add_cluster(self, cluster): - """ - Adds a cluster to the service - :param Cluster cluster: the cluster to be added - :return: void - """ - self.cluster_id_cluster_map[cluster.cluster_id] = cluster - - def remove_cluster(self, cluster_id): - if cluster_id in self.cluster_id_cluster_map: - self.cluster_id_cluster_map.pop(cluster_id) - - def cluster_exists(self, cluster_id): - """ - Checks if the cluster with the given cluster id exists for ther service - :param str cluster_id: - :return: True if the cluster for the given cluster id exists, False if otherwise - :rtype: bool - """ - return cluster_id in self.cluster_id_cluster_map - - def get_cluster(self, cluster_id): - """ - Provides the Cluster information for the provided cluster id - :param str cluster_id: the cluster id to search for - :return: Cluster object for the given cluster id, None if the cluster id is invalid - :rtype: Cluster - """ - if cluster_id in self.cluster_id_cluster_map: - return self.cluster_id_cluster_map[cluster_id] - - return None - - def get_ports(self): - """ - Returns the list of ports in the particular service - :return: The list of Port object - :rtype: list[Port] - """ - return self.port_map.values() - - def get_port(self, proxy_port): - """ - Provides the port information for the provided proxy port - :param str proxy_port: - :return: Port object for the provided port, None if port is invalid - :rtype: Port - """ - if proxy_port in self.port_map: - return self.port_map[proxy_port] - - return None - - def add_port(self, port): - self.port_map[port.proxy] = port - - def add_ports(self, ports): - for port in ports: - self.add_port(port) - - -class Cluster: - """ - Represents a cluster for a service - """ - - def __init__(self, service_name="", cluster_id="", deployment_policy_name="", autoscale_policy_name=""): - self.service_name = service_name - """ :type : str """ - self.cluster_id = cluster_id - """ :type : str """ - self.deployment_policy_name = deployment_policy_name - """ :type : str """ - self.autoscale_policy_name = autoscale_policy_name - """ :type : str """ - self.hostnames = [] - """ :type : list[str] """ - self.member_map = {} - """ :type : dict[str, Member] """ - - self.tenant_range = None - """ :type : str """ - self.is_lb_cluster = False - """ :type : bool """ - self.is_kubernetes_cluster = False - """ :type : bool """ - # self.status = None - # """ :type : str """ - self.load_balancer_algorithm_name = None - """ :type : str """ - self.properties = {} - """ :type : dict[str, str] """ - self.member_list_json = None - """ :type : str """ - self.app_id = "" - """ :type : str """ - # Not relevant to cartridge agent - # self.instance_id_instance_context_map = {} - # """ :type : dict[str, ClusterInstance] """ - - def add_hostname(self, hostname): - self.hostnames.append(hostname) - - def set_tenant_range(self, tenant_range): - self.validate_tenant_range(tenant_range) - self.tenant_range = tenant_range - - def get_members(self): - """ - Provides the list of member information in the cluster - :return: The list of Member object - :rtype: list[Member] - """ - return self.member_map.values() - - def add_member(self, member): - self.member_map[member.member_id] = member - - def remove_member(self, member_id): - if self.member_exists(member_id): - self.member_map.pop(member_id) - - def get_member(self, member_id): - """ - Provides the member information for the provided member id - :param str member_id: - :return: Member object for the provided member id, None if member id is invalid - :rtype: Member - """ - if self.member_exists(member_id): - return self.member_map[member_id] - - return None - - def member_exists(self, member_id): - """ - Checks if the member for the provided member id exists in this cluster - :param str member_id: member id to be searched - :return: True if the member exists, False if otherwise - :rtype: bool - """ - return member_id in self.member_map - - def __str__(self): - return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \ - + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \ - + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \ - + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]" - - def tenant_id_in_range(self, tenant_id): - """ - Check whether a given tenant id is in tenant range of the cluster. - :param str tenant_id: tenant id to be checked - :return: True if the tenant id is in tenant id range, False if otherwise - :rtype: bool - """ - if self.tenant_range is None: - return False - - if self.tenant_range == "*": - return True - else: - arr = self.tenant_range.split(constants.TENANT_RANGE_DELIMITER) - tenant_start = int(arr[0]) - if tenant_start <= tenant_id: - tenant_end = arr[1] - if tenant_end == "*": - return True - else: - if tenant_id <= int(tenant_end): - return True - - return False - - def validate_tenant_range(self, tenant_range): - """ - Validates the tenant range to be either '*' or a delimeted range of numbers - :param str tenant_range: The tenant range string to be validated - :return: void if the provided tenant range is valid, RuntimeError if otherwise - :exception: RuntimeError if the tenant range is invalid - """ - valid = False - if tenant_range == "*": - valid = True - else: - arr = tenant_range.split(constants.TENANT_RANGE_DELIMITER) - if len(arr) == 2: - if arr[0].isdigit() and arr[1].isdigit(): - valid = True - elif arr[0].isdigit() and arr[1] == "*": - valid = True - - if not valid: - raise RuntimeError("Tenant range %r is not valid" % tenant_range) - - -class Member: - """ - Represents a member on a particular cluster - """ - - def __init__(self, service_name="", cluster_id="", network_partition_id="", partition_id="", member_id="", - cluster_instance_id=""): - self.service_name = service_name - """ :type : str """ - self.cluster_id = cluster_id - """ :type : str """ - self.network_partition_id = network_partition_id - """ :type : str """ - self.cluster_instance_id = cluster_instance_id - """ :type : str """ - self.partition_id = partition_id - """ :type : str """ - self.member_id = member_id - """ :type : str """ - self.port_map = {} - """ :type : dict[str, Port] """ - self.init_time = None - """ :type : int """ - - self.member_public_ips = None - """ :type : str """ - self.member_default_public_ip = None - """ :type : str """ - self.status = None - """ :type : str """ - self.member_private_ips = None - """ :type : str """ - self.member_default_private_ip = None - """ :type : str """ - self.properties = {} - """ :type : dict[str, str] """ - self.lb_cluster_id = None - """ :type : str """ - self.json_str = None - """ :type : str """ - - def is_active(self): - """ - Checks if the member is in active state - :return: True if active, False if otherwise - :rtype: bool - """ - return self.status == MemberStatus.Active - - def get_ports(self): - """ - Provides the list of the ports in the member - :return: List of Port objects - :rtype: list[Port] - """ - return self.port_map.values() - - def get_port(self, proxy): - """ - Provides the port information for the given port id - :param str proxy: The port id - :return: Port object of the provided port id, None if otherwise - :rtype: Port - """ - if proxy in self.port_map: - return self.port_map[proxy] - - return None - - def add_port(self, port): - self.port_map[port.proxy] = port - - def add_ports(self, ports): - for port in ports: - self.add_port(port) - - -class Port: - """ - Represents a port on a particular member - """ - - def __init__(self, protocol, value, proxy): - self.protocol = protocol - """ :type : str """ - self.value = value - """ :type : str """ - self.proxy = proxy - """ :type : str """ - - def __str__(self): - return "Port [protocol=%r, value=%r proxy=%r]" % (self.protocol, self.value, self.proxy) - - -class ServiceType: - """ - ServiceType enum - """ - SingleTenant = 1 - MultiTenant = 2 - - -class ClusterStatus: - """ - ClusterStatus enum - """ - Created = 1 - In_Maintenance = 2 - Removed = 3 - - -class MemberStatus: - """ - MemberStatus enum - """ - Created = "Created" - Initialized = "Initialized" - Starting = "Starting" - Active = "Active" - In_Maintenance = "In_Maintenance" - ReadyToShutDown = "ReadyToShutDown" - Suspended = "Suspended" - Terminated = "Terminated" - - -class TopologyContext: - """ - Handles and maintains a model of the topology provided by the Cloud Controller - """ - topology = Topology() - - @staticmethod - def get_topology(): - if TopologyContext.topology is None: - TopologyContext.topology = Topology() - return TopologyContext.topology - - @staticmethod - def update(topology): - TopologyContext.topology = topology - TopologyContext.topology.initialized = True \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py new file mode 100644 index 0000000..086df0f --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import paho.mqtt.publish as publish + +from modules.event.instance.status.events import * +from modules.util.log import * +from modules.util import cartridgeagentutils +import healthstats +import constants +from config import Config + + +log = LogFactory().get_log(__name__) + +started = False +activated = False +ready_to_shutdown = False +maintenance = False + +publishers = {} +""" :type : dict[str, EventPublisher] """ + + +def publish_instance_started_event(): + global started, log + if not started: + log.info("Publishing instance started event") + + application_id = Config.application_id + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id + + instance_started_event = InstanceStartedEvent( + application_id, + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) + + publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_STARTED_EVENT) + publisher.publish(instance_started_event) + started = True + log.info("Instance started event published") + else: + log.warn("Instance already started") + + +def publish_instance_activated_event(health_stat_plugin): + global activated, log + if not activated: + # Wait for all ports to be active + listen_address = Config.listen_address + configuration_ports = Config.ports + ports_active = cartridgeagentutils.wait_until_ports_active( + listen_address, + configuration_ports, + int(Config.read_property("port.check.timeout", critical=False))) + + if ports_active: + log.info("Publishing instance activated event") + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id + + instance_activated_event = InstanceActivatedEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) + + publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_ACTIVATED_EVENT) + publisher.publish(instance_activated_event) + + log.info("Instance activated event published") + log.info("Starting health statistics notifier") + + health_stat_publishing_enabled = Config.read_property(constants.CEP_PUBLISHER_ENABLED, False) + + if health_stat_publishing_enabled: + interval_default = 15 # seconds + interval = Config.read_property("stats.notifier.interval", False) + if interval is not None and len(interval) > 0: + try: + interval = int(interval) + except ValueError: + interval = interval_default + else: + interval = interval_default + + health_stats_publisher = healthstats.HealthStatisticsPublisherManager(interval, health_stat_plugin) + log.info("Starting Health statistics publisher with interval %r" % interval) + health_stats_publisher.start() + else: + log.warn("Statistics publisher is disabled") + + activated = True + log.info("Health statistics notifier started") + else: + log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s" + % (listen_address, configuration_ports)) + else: + log.warn("Instance already activated") + + +def publish_maintenance_mode_event(): + global maintenance, log + if not maintenance: + log.info("Publishing instance maintenance mode event") + + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id + + instance_maintenance_mode_event = InstanceMaintenanceModeEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) + + publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_MAINTENANCE_MODE_EVENT) + publisher.publish(instance_maintenance_mode_event) + + maintenance = True + log.info("Instance Maintenance mode event published") + else: + log.warn("Instance already in a Maintenance mode...") + + +def publish_instance_ready_to_shutdown_event(): + global ready_to_shutdown, log + if not ready_to_shutdown: + log.info("Publishing instance activated event") + + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id + + instance_shutdown_event = InstanceReadyToShutdownEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) + + publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_READY_TO_SHUTDOWN_EVENT) + publisher.publish(instance_shutdown_event) + + ready_to_shutdown = True + log.info("Instance ReadyToShutDown event published") + else: + log.warn("Instance already in a ReadyToShutDown event...") + + +def get_publisher(topic): + if topic not in publishers: + publishers[topic] = EventPublisher(topic) + + return publishers[topic] + + +class EventPublisher: + """ + Handles publishing events to topics to the provided message broker + """ + def __init__(self, topic): + self.__topic = topic + + def publish(self, event): + mb_ip = Config.read_property(constants.MB_IP) + mb_port = Config.read_property(constants.MB_PORT) + payload = event.to_json() + publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py new file mode 100644 index 0000000..0578d36 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from Queue import Queue + +import threading +import paho.mqtt.client as mqtt + + +class EventSubscriber(threading.Thread): + """ + Provides functionality to subscribe to a given topic on the Stratos MB and + register event handlers for various events. + """ + + def __init__(self, topic, ip, port): + threading.Thread.__init__(self) + + self.__event_queue = Queue(maxsize=0) + self.__event_executor = EventExecutor(self.__event_queue) + + self.log = LogFactory().get_log(__name__) + + self.__mb_client = None + self.__topic = topic + self.__subscribed = False + self.__ip = ip + self.__port = port + + def run(self): + # Start the event executor thread + self.__event_executor.start() + self.__mb_client = mqtt.Client() + self.__mb_client.on_connect = self.on_connect + self.__mb_client.on_message = self.on_message + + self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) + self.__mb_client.connect(self.__ip, self.__port, 60) + self.__subscribed = True + self.__mb_client.loop_forever() + + def register_handler(self, event, handler): + """ + Adds an event handler function mapped to the provided event. + :param str event: Name of the event to attach the provided handler + :param handler: The handler function + :return: void + :rtype: void + """ + self.__event_executor.register_event_handler(event, handler) + self.log.debug("Registered handler for event %r" % event) + + def on_connect(self, client, userdata, flags, rc): + self.log.debug("Connected to message broker.") + self.__mb_client.subscribe(self.__topic) + self.log.debug("Subscribed to %r" % self.__topic) + + def on_message(self, client, userdata, msg): + self.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload)) + self.__event_queue.put(msg) + + def is_subscribed(self): + """ + Checks if this event subscriber is successfully subscribed to the provided topic + :return: True if subscribed, False if otherwise + :rtype: bool + """ + return self.__subscribed + + +class EventExecutor(threading.Thread): + """ + Polls the event queue and executes event handlers for each event + """ + def __init__(self, event_queue): + threading.Thread.__init__(self) + self.__event_queue = event_queue + # TODO: several handlers for one event + self.__event_handlers = {} + self.log = LogFactory().get_log(__name__) + + def run(self): + while True: + event_msg = self.__event_queue.get() + event = event_msg.topic.rpartition('/')[2] + if event in self.__event_handlers: + handler = self.__event_handlers[event] + try: + self.log.debug("Executing handler for event %r" % event) + handler(event_msg) + except: + self.log.exception("Error processing %r event" % event) + else: + + self.log.debug("Event handler not found for event : %r" % event) + + def register_event_handler(self, event, handler): + self.__event_handlers[event] = handler + + def terminate(self): + self.terminate() + + +from modules.util.log import LogFactory \ No newline at end of file
