PCA - Module refactoring to reduce number of modules and module nesting
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7f5ca0c1 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7f5ca0c1 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7f5ca0c1 Branch: refs/heads/master Commit: 7f5ca0c1613d611436b195f375545bedbe19d3aa Parents: ee5de9d Author: Chamila de Alwis <[email protected]> Authored: Fri May 1 03:56:12 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Thu Jul 30 00:17:28 2015 -0400 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 13 +- .../cartridge.agent/cartridge.agent/entity.py | 636 +++++++++++++++++++ .../cartridge.agent/logpublisher.py | 287 +++++++++ .../modules/datapublisher/__init__.py | 18 - .../modules/datapublisher/logpublisher.py | 289 --------- .../modules/event/eventhandler.py | 53 +- .../modules/event/tenant/events.py | 2 +- .../modules/event/topology/events.py | 2 +- .../modules/publisher/__init__.py | 16 - .../publisher/cartridgeagentpublisher.py | 215 ------- .../modules/subscriber/__init__.py | 17 - .../modules/subscriber/eventsubscriber.py | 117 ---- .../cartridge.agent/modules/tenant/__init__.py | 16 - .../modules/tenant/tenantcontext.py | 185 ------ .../modules/topology/__init__.py | 16 - .../modules/topology/topologycontext.py | 466 -------------- .../cartridge.agent/publisher.py | 215 +++++++ .../cartridge.agent/subscriber.py | 117 ++++ 18 files changed, 1283 insertions(+), 1397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index ac4aa07..a092764 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -18,16 +18,15 @@ import threading -from modules.subscriber.eventsubscriber import EventSubscriber -from modules.publisher import cartridgeagentpublisher +from subscriber import EventSubscriber +import publisher from modules.event.instance.notifier.events import * from modules.event.tenant.events import * from modules.event.topology.events import * from modules.event.application.signup.events import * from modules.event.domain.mapping.events import * -from modules.tenant.tenantcontext import * -from modules.topology.topologycontext import * -from modules.datapublisher.logpublisher import * +from entity import * +from logpublisher import * from config import Config from modules.event.eventhandler import EventHandler import constants @@ -85,7 +84,7 @@ class CartridgeAgent(threading.Thread): self.__event_handler.on_instance_started_event() # Publish instance started event - cartridgeagentpublisher.publish_instance_started_event() + publisher.publish_instance_started_event() # Execute start servers extension try: @@ -98,7 +97,7 @@ class CartridgeAgent(threading.Thread): if repo_url is None or str(repo_url).strip() == "": self.__log.info("No artifact repository found") self.__event_handler.on_instance_activated_event() - cartridgeagentpublisher.publish_instance_activated_event(Config.health_stat_plugin) + publisher.publish_instance_activated_event(Config.health_stat_plugin) else: self.__log.info( "Artifact repository found, waiting for artifact updated event to checkout artifacts: [repo_url] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py new file mode 100644 index 0000000..fd40511 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py @@ -0,0 +1,636 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import constants + + +class Topology: + """ + Represents the topology provided by the Cloud Controller + """ + + def __init__(self): + self.service_map = {} + """ :type : dict[str, Service] """ + self.initialized = False + """ :type : bool """ + self.json_str = None + """ :type : str """ + + def get_services(self): + """ + Provides the list of services on the topology + :return: The list of Service objects + :rtype: list[Service] + """ + return self.service_map.values() + + def get_service(self, service_name): + """ + Provides the service information for the given service name + :param str service_name: service name to be retrieved + :return: Service object of the service, None if the provided service name is invalid + :rtype: Service + """ + if service_name in self.service_map: + return self.service_map[service_name] + + return None + + def add_service(self, service): + """ + Adds a service to the list of services on the topology + + :param Service service: + :return: void + """ + self.service_map[service.service_name] = service + + def add_services(self, services): + """ + + :param list[Service] services: + :return: void + """ + for service in services: + self.add_service(service) + + def remove_service(self, service_name): + """ + Removes the service of the provided service name + :param str service_name: + :return: void + """ + if service_name in self.service_map: + self.service_map.pop(service_name) + + def service_exists(self, service_name): + """ + Checks if the service of the provided service name exists + :param str service_name: + :return: True if the service exists, False if otherwise + :rtype: bool + """ + return service_name in self.service_map + + def clear(self): + """ + Clears the service information list + :return: void + """ + self.service_map = {} + + def __str__(self): + """ + to string override + :return: + """ + return "Topology [serviceMap= %r , initialized= %r ]" % (self.service_map, self.initialized) + + +class Service: + """ + Represents a service on the topology + """ + + def __init__(self, service_name, service_type): + self.service_name = service_name + """ :type : str """ + self.service_type = service_type + """ :type : str """ + self.cluster_id_cluster_map = {} + """ :type : dict[str, Cluster] """ + self.port_map = {} + """ :type : dict[str, Port] """ + self.properties = {} + """ :type : dict[str, str] """ + + def get_clusters(self): + """ + Provides the list of clusters in the particular service + :return: The list of Cluster objects + :rtype: list[Cluster] + """ + return self.cluster_id_cluster_map.values() + + def add_cluster(self, cluster): + """ + Adds a cluster to the service + :param Cluster cluster: the cluster to be added + :return: void + """ + self.cluster_id_cluster_map[cluster.cluster_id] = cluster + + def remove_cluster(self, cluster_id): + if cluster_id in self.cluster_id_cluster_map: + self.cluster_id_cluster_map.pop(cluster_id) + + def cluster_exists(self, cluster_id): + """ + Checks if the cluster with the given cluster id exists for ther service + :param str cluster_id: + :return: True if the cluster for the given cluster id exists, False if otherwise + :rtype: bool + """ + return cluster_id in self.cluster_id_cluster_map + + def get_cluster(self, cluster_id): + """ + Provides the Cluster information for the provided cluster id + :param str cluster_id: the cluster id to search for + :return: Cluster object for the given cluster id, None if the cluster id is invalid + :rtype: Cluster + """ + if cluster_id in self.cluster_id_cluster_map: + return self.cluster_id_cluster_map[cluster_id] + + return None + + def get_ports(self): + """ + Returns the list of ports in the particular service + :return: The list of Port object + :rtype: list[Port] + """ + return self.port_map.values() + + def get_port(self, proxy_port): + """ + Provides the port information for the provided proxy port + :param str proxy_port: + :return: Port object for the provided port, None if port is invalid + :rtype: Port + """ + if proxy_port in self.port_map: + return self.port_map[proxy_port] + + return None + + def add_port(self, port): + self.port_map[port.proxy] = port + + def add_ports(self, ports): + for port in ports: + self.add_port(port) + + +class Cluster: + """ + Represents a cluster for a service + """ + + def __init__(self, service_name="", cluster_id="", deployment_policy_name="", autoscale_policy_name=""): + self.service_name = service_name + """ :type : str """ + self.cluster_id = cluster_id + """ :type : str """ + self.deployment_policy_name = deployment_policy_name + """ :type : str """ + self.autoscale_policy_name = autoscale_policy_name + """ :type : str """ + self.hostnames = [] + """ :type : list[str] """ + self.member_map = {} + """ :type : dict[str, Member] """ + + self.tenant_range = None + """ :type : str """ + self.is_lb_cluster = False + """ :type : bool """ + self.is_kubernetes_cluster = False + """ :type : bool """ + # self.status = None + # """ :type : str """ + self.load_balancer_algorithm_name = None + """ :type : str """ + self.properties = {} + """ :type : dict[str, str] """ + self.member_list_json = None + """ :type : str """ + self.app_id = "" + """ :type : str """ + # Not relevant to cartridge agent + # self.instance_id_instance_context_map = {} + # """ :type : dict[str, ClusterInstance] """ + + def add_hostname(self, hostname): + self.hostnames.append(hostname) + + def set_tenant_range(self, tenant_range): + Cluster.validate_tenant_range(tenant_range) + self.tenant_range = tenant_range + + def get_members(self): + """ + Provides the list of member information in the cluster + :return: The list of Member object + :rtype: list[Member] + """ + return self.member_map.values() + + def add_member(self, member): + self.member_map[member.member_id] = member + + def remove_member(self, member_id): + if self.member_exists(member_id): + self.member_map.pop(member_id) + + def get_member(self, member_id): + """ + Provides the member information for the provided member id + :param str member_id: + :return: Member object for the provided member id, None if member id is invalid + :rtype: Member + """ + if self.member_exists(member_id): + return self.member_map[member_id] + + return None + + def member_exists(self, member_id): + """ + Checks if the member for the provided member id exists in this cluster + :param str member_id: member id to be searched + :return: True if the member exists, False if otherwise + :rtype: bool + """ + return member_id in self.member_map + + def __str__(self): + return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \ + + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \ + + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \ + + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]" + + def tenant_id_in_range(self, tenant_id): + """ + Check whether a given tenant id is in tenant range of the cluster. + :param str tenant_id: tenant id to be checked + :return: True if the tenant id is in tenant id range, False if otherwise + :rtype: bool + """ + if self.tenant_range is None: + return False + + if self.tenant_range == "*": + return True + else: + arr = self.tenant_range.split(constants.TENANT_RANGE_DELIMITER) + tenant_start = int(arr[0]) + if tenant_start <= tenant_id: + tenant_end = arr[1] + if tenant_end == "*": + return True + else: + if tenant_id <= int(tenant_end): + return True + + return False + + @staticmethod + def validate_tenant_range(tenant_range): + """ + Validates the tenant range to be either '*' or a delimeted range of numbers + :param str tenant_range: The tenant range string to be validated + :return: void if the provided tenant range is valid, RuntimeError if otherwise + :exception: RuntimeError if the tenant range is invalid + """ + valid = False + if tenant_range == "*": + valid = True + else: + arr = tenant_range.split(constants.TENANT_RANGE_DELIMITER) + if len(arr) == 2: + if arr[0].isdigit() and arr[1].isdigit(): + valid = True + elif arr[0].isdigit() and arr[1] == "*": + valid = True + + if not valid: + raise RuntimeError("Tenant range %r is not valid" % tenant_range) + + +class Member: + """ + Represents a member on a particular cluster + """ + + def __init__(self, service_name="", cluster_id="", network_partition_id="", partition_id="", member_id="", + cluster_instance_id=""): + self.service_name = service_name + """ :type : str """ + self.cluster_id = cluster_id + """ :type : str """ + self.network_partition_id = network_partition_id + """ :type : str """ + self.cluster_instance_id = cluster_instance_id + """ :type : str """ + self.partition_id = partition_id + """ :type : str """ + self.member_id = member_id + """ :type : str """ + self.port_map = {} + """ :type : dict[str, Port] """ + self.init_time = None + """ :type : int """ + + self.member_public_ips = None + """ :type : str """ + self.member_default_public_ip = None + """ :type : str """ + self.status = None + """ :type : str """ + self.member_private_ips = None + """ :type : str """ + self.member_default_private_ip = None + """ :type : str """ + self.properties = {} + """ :type : dict[str, str] """ + self.lb_cluster_id = None + """ :type : str """ + self.json_str = None + """ :type : str """ + + def is_active(self): + """ + Checks if the member is in active state + :return: True if active, False if otherwise + :rtype: bool + """ + return self.status == MemberStatus.Active + + def get_ports(self): + """ + Provides the list of the ports in the member + :return: List of Port objects + :rtype: list[Port] + """ + return self.port_map.values() + + def get_port(self, proxy): + """ + Provides the port information for the given port id + :param str proxy: The port id + :return: Port object of the provided port id, None if otherwise + :rtype: Port + """ + if proxy in self.port_map: + return self.port_map[proxy] + + return None + + def add_port(self, port): + self.port_map[port.proxy] = port + + def add_ports(self, ports): + for port in ports: + self.add_port(port) + + +class Port: + """ + Represents a port on a particular member + """ + + def __init__(self, protocol, value, proxy): + self.protocol = protocol + """ :type : str """ + self.value = value + """ :type : str """ + self.proxy = proxy + """ :type : str """ + + def __str__(self): + return "Port [protocol=%r, value=%r proxy=%r]" % (self.protocol, self.value, self.proxy) + + +class ServiceType: + """ + ServiceType enum + """ + SingleTenant = 1 + MultiTenant = 2 + + +class ClusterStatus: + """ + ClusterStatus enum + """ + Created = 1 + In_Maintenance = 2 + Removed = 3 + + +class MemberStatus: + """ + MemberStatus enum + """ + Created = "Created" + Initialized = "Initialized" + Starting = "Starting" + Active = "Active" + In_Maintenance = "In_Maintenance" + ReadyToShutDown = "ReadyToShutDown" + Suspended = "Suspended" + Terminated = "Terminated" + + +class TopologyContext: + """ + Handles and maintains a model of the topology provided by the Cloud Controller + """ + topology = Topology() + + @staticmethod + def get_topology(): + if TopologyContext.topology is None: + TopologyContext.topology = Topology() + return TopologyContext.topology + + @staticmethod + def update(topology): + TopologyContext.topology = topology + TopologyContext.topology.initialized = True + + +class Tenant: + """ + Object type representing the tenant details of a single tenant + """ + + def __init__(self, tenant_id, tenant_domain): + self.tenant_id = tenant_id + """ :type : int """ + self.tenant_domain = tenant_domain + """ :type : str """ + self.service_name_subscription_map = {} + """ :type : dict[str, Subscription] """ + + def get_subscription(self, service_name): + """ + Returns the Subscription object related to the provided service name + :param str service_name: service name to be retrieved + :return: Subscription of the service or None if the service name doesn't exist + :rtype: Subscription + """ + if service_name in self.service_name_subscription_map: + return self.service_name_subscription_map[service_name] + + return None + + def is_subscribed(self, service_name): + """ + Checks if the given service name has a subscription from this tenant + :param str service_name: name of the service to check + :return: True if the tenant is subscribed to the given service name, False if not + :rtype: bool + """ + return service_name in self.service_name_subscription_map + + def add_subscription(self, subscription): + """ + Adds a subscription information entry on the subscription list for this tenant + :param Subscription subscription: Subscription information to be added + :return: void + :rtype: void + """ + self.service_name_subscription_map[subscription.service_name] = subscription + + def remove_subscription(self, service_name): + """ + Removes the specified subscription details from the subscription list + :param str service_name: The service name of the subscription to be removed + :return: void + :rtype: void + """ + if service_name in self.service_name_subscription_map: + self.service_name_subscription_map.pop(service_name) + + +class Subscription: + """ + Subscription information of a particular subscription to a service + """ + + def __init__(self, service_name, cluster_ids): + self.service_name = service_name + """ :type : str """ + self.cluster_ids = cluster_ids + """ :type : list[str] """ + self.subscription_domain_map = {} + """ :type : dict[str, SubscriptionDomain] """ + + def add_subscription_domain(self, domain_name, application_context): + """ + Adds a subscription domain + :param str domain_name: + :param str application_context: + :return: void + :rtype: void + """ + self.subscription_domain_map[domain_name] = SubscriptionDomain(domain_name, application_context) + + def remove_subscription_domain(self, domain_name): + """ + Removes the subscription domain of the specified domain name + :param str domain_name: + :return: void + :rtype: void + """ + if domain_name in self.subscription_domain_map: + self.subscription_domain_map.pop(domain_name) + + def subscription_domain_exists(self, domain_name): + """ + Returns the SubscriptionDomain information of the specified domain name + :param str domain_name: + :return: SubscriptionDomain + :rtype: SubscriptionDomain + """ + return domain_name in self.subscription_domain_map + + def get_subscription_domains(self): + """ + Returns the list of subscription domains of this subscription + :return: List of SubscriptionDomain objects + :rtype: list[SubscriptionDomain] + """ + return self.subscription_domain_map.values() + + +class SubscriptionDomain: + """ + Represents a Subscription Domain + """ + + def __init__(self, domain_name, application_context): + self.domain_name = domain_name + """ :type : str """ + self.application_context = application_context + """ :type : str """ + + +class TenantContext: + """ + Handles and maintains a model of all the information related to tenants within this instance + """ + tenants = {} + initialized = False + tenant_domains = {"carbon.super": Tenant(-1234, "carbon.super")} + + @staticmethod + def add_tenant(tenant): + TenantContext.tenants[tenant.tenant_id] = tenant + TenantContext.tenant_domains[tenant.tenant_domain] = tenant + + @staticmethod + def remove_tenant(tenant_id): + if tenant_id in TenantContext.tenants: + tenant = TenantContext.get_tenant(tenant_id) + TenantContext.tenants.pop(tenant.tenant_id) + TenantContext.tenant_domains.pop(tenant.tenant_domain) + + @staticmethod + def update(tenants): + for tenant in tenants: + TenantContext.add_tenant(tenant) + + @staticmethod + def get_tenant(tenant_id): + """ + Gets the Tenant object of the provided tenant ID + :param int tenant_id: + :return: Tenant object of the provided tenant ID + :rtype: Tenant + """ + if tenant_id in TenantContext.tenants: + return TenantContext.tenants[tenant_id] + + return None + + @staticmethod + def get_tenant_by_domain(tenant_domain): + """ + Gets the Tenant object of the provided tenant domain + :param str tenant_domain: + :return: Tenant object of the provided tenant domain + :rtype: str + """ + if tenant_domain in TenantContext.tenant_domains: + return TenantContext.tenant_domains[tenant_domain] + + return None \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py new file mode 100644 index 0000000..e7ab1c7 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py @@ -0,0 +1,287 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from threading import Thread, current_thread + +from modules.databridge.agent import * +from config import Config +from modules.util import cartridgeagentutils +from exception import DataPublisherException +import constants + + +class LogPublisher(Thread): + + def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, member_id): + Thread.__init__(self) + self.log = LogFactory().get_log(__name__) + self.file_path = file_path + self.thrift_publisher = ThriftPublisher( + DataPublisherConfiguration.get_instance().monitoring_server_ip, + DataPublisherConfiguration.get_instance().monitoring_server_secure_port, + DataPublisherConfiguration.get_instance().admin_username, + DataPublisherConfiguration.get_instance().admin_password, + stream_definition) + self.tenant_id = tenant_id + self.alias = alias + self.date_time = date_time + self.member_id = member_id + + self.terminated = False + + def run(self): + if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK): + self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + str(current_thread())) + # open file and keep reading for new entries + # with open(self.file_path, "r") as read_file: + read_file = open(self.file_path, "r") + read_file.seek(os.stat(self.file_path)[6]) # go to the end of the file + + while not self.terminated: + where = read_file.tell() # where the seeker is in the file + line = read_file.readline() # read the current line + if not line: + # no new line entered + self.log.debug("No new log entries detected to publish.") + time.sleep(1) + read_file.seek(where) # set seeker + else: + # new line detected, create event object + self.log.debug("Log entry/entries detected. Publishing to monitoring server.") + event = ThriftEvent() + event.metaData.append(self.member_id) + event.payloadData.append(self.tenant_id) + event.payloadData.append(self.alias) + event.payloadData.append("") + event.payloadData.append(self.date_time) + event.payloadData.append("") + event.payloadData.append(line) + event.payloadData.append("") + event.payloadData.append("") + event.payloadData.append(self.member_id) + event.payloadData.append("") + + self.thrift_publisher.publish(event) + self.log.debug("Log event published.") + + self.thrift_publisher.disconnect() # disconnect the publisher upon being terminated + self.log.debug("Log publisher for path \"%s\" terminated" % self.file_path) + else: + raise DataPublisherException("Unable to read the file at path \"%s\"" % self.file_path) + + def terminate(self): + """ + Allows the LogPublisher thread to be terminated to stop publishing to BAM/CEP. Allow a minimum of 1 second delay + to take effect. + """ + self.terminated = True + + +class LogPublisherManager(Thread): + """ + A log publishing thread management thread which maintains a log publisher for each log file. Also defines a stream + definition and the BAM/CEP server information for a single publishing context. + """ + + @staticmethod + def define_stream(tenant_id, alias, date_time): + """ + Creates a stream definition for Log Publishing + :return: A StreamDefinition object with the required attributes added + :rtype : StreamDefinition + """ + # stream definition + stream_definition = StreamDefinition() + stream_name = "logs." + tenant_id + "." + alias + "." + date_time + stream_version = "1.0.0" + stream_nickname = "log entries from instance" + stream_description = "Apache Stratos Instance Log Publisher" + + stream_definition.name = stream_name + stream_definition.version = stream_version + stream_definition.description = stream_description + stream_definition.nickname = stream_nickname + stream_definition.add_metadata_attribute("memberId", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("tenantID", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("serverName", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("appName", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("logTime", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("priority", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("message", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("logger", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("ip", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("instance", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("stacktrace", StreamDefinition.STRING) + + return stream_definition + + def __init__(self, logfile_paths): + Thread.__init__(self) + + self.log = LogFactory().get_log(__name__) + + self.logfile_paths = logfile_paths + self.publishers = {} + self.ports = [] + self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) + self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) + + self.log.debug("Checking if Monitoring server is active.") + ports_active = cartridgeagentutils.wait_until_ports_active( + DataPublisherConfiguration.get_instance().monitoring_server_ip, + self.ports, + int(Config.read_property("port.check.timeout", critical=False))) + + if not ports_active: + self.log.debug("Monitoring server is not active") + raise DataPublisherException("Monitoring server not active, data publishing is aborted") + + self.log.debug("Monitoring server is up and running. Log Publisher Manager started.") + + self.tenant_id = LogPublisherManager.get_valid_tenant_id(Config.tenant_id) + self.alias = LogPublisherManager.get_alias(Config.cluster_id) + self.date_time = LogPublisherManager.get_current_date() + + self.stream_definition = self.define_stream(self.tenant_id, self.alias, self.date_time) + + def run(self): + if self.logfile_paths is not None and len(self.logfile_paths): + for log_path in self.logfile_paths: + # thread for each log file + publisher = self.get_publisher(log_path) + publisher.start() + self.log.debug("Log publisher for path \"%s\" started." % log_path) + + def get_publisher(self, log_path): + """ + Retrieve the publisher for the specified log file path. Creates a new LogPublisher if one is not available + :return: The LogPublisher object + :rtype : LogPublisher + """ + if log_path not in self.publishers: + self.log.debug("Creating a Log publisher for path \"%s\"" % log_path) + self.publishers[log_path] = LogPublisher( + log_path, + self.stream_definition, + self.tenant_id, + self.alias, + self.date_time, + Config.member_id) + + return self.publishers[log_path] + + def terminate_publisher(self, log_path): + """ + Terminates the LogPublisher thread associated with the specified log file + """ + if log_path in self.publishers: + self.publishers[log_path].terminate() + + def terminate_all_publishers(self): + """ + Terminates all LogPublisher threads + """ + for publisher in self.publishers: + publisher.terminate() + + @staticmethod + def get_valid_tenant_id(tenant_id): + if tenant_id == constants.INVALID_TENANT_ID or tenant_id == constants.SUPER_TENANT_ID: + return "0" + + return tenant_id + + @staticmethod + def get_alias(cluster_id): + try: + alias = cluster_id.split("\\.")[0] + except: + alias = cluster_id + + return alias + + @staticmethod + def get_current_date(): + """ + Returns the current date formatted as yyyy-MM-dd + :return: Formatted date string + :rtype : str + """ + return datetime.date.today().strftime(constants.DATE_FORMAT) + + +class DataPublisherConfiguration: + """ + A singleton implementation to access configuration information for data publishing to BAM/CEP + TODO: get rid of this + """ + + __instance = None + log = LogFactory().get_log(__name__) + + @staticmethod + def get_instance(): + """ + Singleton instance retriever + :return: Instance + :rtype : DataPublisherConfiguration + """ + if DataPublisherConfiguration.__instance is None: + DataPublisherConfiguration.__instance = DataPublisherConfiguration() + + return DataPublisherConfiguration.__instance + + def __init__(self): + self.enabled = False + self.monitoring_server_ip = None + self.monitoring_server_port = None + self.monitoring_server_secure_port = None + self.admin_username = None + self.admin_password = None + + self.read_config() + + def read_config(self): + self.enabled = Config.read_property(constants.MONITORING_PUBLISHER_ENABLED, False) + if not self.enabled: + DataPublisherConfiguration.log.info("Data Publisher disabled") + return + + DataPublisherConfiguration.log.info("Data Publisher enabled") + + self.monitoring_server_ip = Config.read_property(constants.MONITORING_RECEIVER_IP, False) + if self.monitoring_server_ip is None: + raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_IP) + + self.monitoring_server_port = Config.read_property(constants.MONITORING_RECEIVER_PORT, False) + if self.monitoring_server_port is None: + raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_PORT) + + self.monitoring_server_secure_port = Config.read_property("monitoring.server.secure.port", False) + if self.monitoring_server_secure_port is None: + raise RuntimeError("System property not found: monitoring.server.secure.port") + + self.admin_username = Config.read_property(constants.MONITORING_SERVER_ADMIN_USERNAME, False) + if self.admin_username is None: + raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_USERNAME) + + self.admin_password = Config.read_property(constants.MONITORING_SERVER_ADMIN_PASSWORD, False) + if self.admin_password is None: + raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_PASSWORD) + + DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/__init__.py deleted file mode 100644 index a595c84..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py deleted file mode 100644 index e38ae16..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py +++ /dev/null @@ -1,289 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import datetime -from threading import Thread, current_thread - -from ..databridge.agent import * -from config import Config -from ..util import cartridgeagentutils -from exception import DataPublisherException -import constants - - -class LogPublisher(Thread): - - def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, member_id): - Thread.__init__(self) - self.log = LogFactory().get_log(__name__) - self.file_path = file_path - self.thrift_publisher = ThriftPublisher( - DataPublisherConfiguration.get_instance().monitoring_server_ip, - DataPublisherConfiguration.get_instance().monitoring_server_secure_port, - DataPublisherConfiguration.get_instance().admin_username, - DataPublisherConfiguration.get_instance().admin_password, - stream_definition) - self.tenant_id = tenant_id - self.alias = alias - self.date_time = date_time - self.member_id = member_id - - self.terminated = False - - def run(self): - if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK): - self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + str(current_thread())) - # open file and keep reading for new entries - # with open(self.file_path, "r") as read_file: - read_file = open(self.file_path, "r") - read_file.seek(os.stat(self.file_path)[6]) # go to the end of the file - - while not self.terminated: - where = read_file.tell() # where the seeker is in the file - line = read_file.readline() # read the current line - if not line: - # no new line entered - self.log.debug("No new log entries detected to publish.") - time.sleep(1) - read_file.seek(where) # set seeker - else: - # new line detected, create event object - self.log.debug("Log entry/entries detected. Publishing to monitoring server.") - event = ThriftEvent() - event.metaData.append(self.member_id) - event.payloadData.append(self.tenant_id) - event.payloadData.append(self.alias) - event.payloadData.append("") - event.payloadData.append(self.date_time) - event.payloadData.append("") - event.payloadData.append(line) - event.payloadData.append("") - event.payloadData.append("") - event.payloadData.append(self.member_id) - event.payloadData.append("") - - self.thrift_publisher.publish(event) - self.log.debug("Log event published.") - - self.thrift_publisher.disconnect() # disconnect the publisher upon being terminated - self.log.debug("Log publisher for path \"%s\" terminated" % self.file_path) - else: - raise DataPublisherException("Unable to read the file at path \"%s\"" % self.file_path) - - def terminate(self): - """ - Allows the LogPublisher thread to be terminated to stop publishing to BAM/CEP. Allow a minimum of 1 second delay - to take effect. - """ - self.terminated = True - - -class LogPublisherManager(Thread): - """ - A log publishing thread management thread which maintains a log publisher for each log file. Also defines a stream - definition and the BAM/CEP server information for a single publishing context. - """ - - @staticmethod - def define_stream(tenant_id, alias, date_time): - """ - Creates a stream definition for Log Publishing - :return: A StreamDefinition object with the required attributes added - :rtype : StreamDefinition - """ - # stream definition - stream_definition = StreamDefinition() - stream_name = "logs." + tenant_id + "." + alias + "." + date_time - stream_version = "1.0.0" - stream_nickname = "log entries from instance" - stream_description = "Apache Stratos Instance Log Publisher" - - stream_definition.name = stream_name - stream_definition.version = stream_version - stream_definition.description = stream_description - stream_definition.nickname = stream_nickname - stream_definition.add_metadata_attribute("memberId", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("tenantID", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("serverName", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("appName", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("logTime", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("priority", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("message", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("logger", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("ip", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("instance", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("stacktrace", StreamDefinition.STRING) - - return stream_definition - - def __init__(self, logfile_paths): - Thread.__init__(self) - - self.log = LogFactory().get_log(__name__) - - self.logfile_paths = logfile_paths - self.publishers = {} - self.ports = [] - self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) - self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) - - self.cartridge_agent_config = Config - - self.log.debug("Checking if Monitoring server is active.") - ports_active = cartridgeagentutils.wait_until_ports_active( - DataPublisherConfiguration.get_instance().monitoring_server_ip, - self.ports, - int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) - - if not ports_active: - self.log.debug("Monitoring server is not active") - raise DataPublisherException("Monitoring server not active, data publishing is aborted") - - self.log.debug("Monitoring server is up and running. Log Publisher Manager started.") - - self.tenant_id = LogPublisherManager.get_valid_tenant_id(Config.tenant_id) - self.alias = LogPublisherManager.get_alias(Config.cluster_id) - self.date_time = LogPublisherManager.get_current_date() - - self.stream_definition = self.define_stream(self.tenant_id, self.alias, self.date_time) - - def run(self): - if self.logfile_paths is not None and len(self.logfile_paths): - for log_path in self.logfile_paths: - # thread for each log file - publisher = self.get_publisher(log_path) - publisher.start() - self.log.debug("Log publisher for path \"%s\" started." % log_path) - - def get_publisher(self, log_path): - """ - Retrieve the publisher for the specified log file path. Creates a new LogPublisher if one is not available - :return: The LogPublisher object - :rtype : LogPublisher - """ - if log_path not in self.publishers: - self.log.debug("Creating a Log publisher for path \"%s\"" % log_path) - self.publishers[log_path] = LogPublisher( - log_path, - self.stream_definition, - self.tenant_id, - self.alias, - self.date_time, - self.cartridge_agent_config.member_id) - - return self.publishers[log_path] - - def terminate_publisher(self, log_path): - """ - Terminates the LogPublisher thread associated with the specified log file - """ - if log_path in self.publishers: - self.publishers[log_path].terminate() - - def terminate_all_publishers(self): - """ - Terminates all LogPublisher threads - """ - for publisher in self.publishers: - publisher.terminate() - - @staticmethod - def get_valid_tenant_id(tenant_id): - if tenant_id == constants.INVALID_TENANT_ID or tenant_id == constants.SUPER_TENANT_ID: - return "0" - - return tenant_id - - @staticmethod - def get_alias(cluster_id): - try: - alias = cluster_id.split("\\.")[0] - except: - alias = cluster_id - - return alias - - @staticmethod - def get_current_date(): - """ - Returns the current date formatted as yyyy-MM-dd - :return: Formatted date string - :rtype : str - """ - return datetime.date.today().strftime(constants.DATE_FORMAT) - - -class DataPublisherConfiguration: - """ - A singleton implementation to access configuration information for data publishing to BAM/CEP - TODO: get rid of this - """ - - __instance = None - log = LogFactory().get_log(__name__) - - @staticmethod - def get_instance(): - """ - Singleton instance retriever - :return: Instance - :rtype : DataPublisherConfiguration - """ - if DataPublisherConfiguration.__instance is None: - DataPublisherConfiguration.__instance = DataPublisherConfiguration() - - return DataPublisherConfiguration.__instance - - def __init__(self): - self.enabled = False - self.monitoring_server_ip = None - self.monitoring_server_port = None - self.monitoring_server_secure_port = None - self.admin_username = None - self.admin_password = None - - self.read_config() - - def read_config(self): - self.enabled = Config.read_property(constants.MONITORING_PUBLISHER_ENABLED, False) - if not self.enabled: - DataPublisherConfiguration.log.info("Data Publisher disabled") - return - - DataPublisherConfiguration.log.info("Data Publisher enabled") - - self.monitoring_server_ip = Config.read_property(constants.MONITORING_RECEIVER_IP, False) - if self.monitoring_server_ip is None: - raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_IP) - - self.monitoring_server_port = Config.read_property(constants.MONITORING_RECEIVER_PORT, False) - if self.monitoring_server_port is None: - raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_PORT) - - self.monitoring_server_secure_port = Config.read_property("monitoring.server.secure.port", False) - if self.monitoring_server_secure_port is None: - raise RuntimeError("System property not found: monitoring.server.secure.port") - - self.admin_username = Config.read_property(constants.MONITORING_SERVER_ADMIN_USERNAME, False) - if self.admin_username is None: - raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_USERNAME) - - self.admin_password = Config.read_property(constants.MONITORING_SERVER_ADMIN_PASSWORD, False) - if self.admin_password is None: - raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_PASSWORD) - - DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index c75acaf..37e2b3c 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -22,9 +22,8 @@ from ..util import cartridgeagentutils from ..artifactmgt.git.agentgithandler import * from ..artifactmgt.repository import Repository from config import Config -from ..publisher import cartridgeagentpublisher -from ..topology.topologycontext import * -from ..tenant.tenantcontext import * +import publisher +from entity import * from ..util.log import LogFactory import constants @@ -104,7 +103,7 @@ class EventHandler: if subscribe_run: # publish instanceActivated - cartridgeagentpublisher.publish_instance_activated_event(Config.health_stat_plugin) + publisher.publish_instance_activated_event(Config.health_stat_plugin) elif updated: # updated on pull self.on_artifact_update_scheduler_event(tenant_id) @@ -152,7 +151,7 @@ class EventHandler: member_activated_event.cluster_id, member_activated_event.member_id)) - member_initialized = self.check_member_state_in_topology( + member_initialized = self.is_member_initialized_in_topology( member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id) @@ -170,7 +169,7 @@ class EventHandler: cluster_id_in_payload = Config.cluster_id member_id_in_payload = Config.member_id - member_initialized = self.check_member_state_in_topology( + member_initialized = self.is_member_initialized_in_topology( service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -226,7 +225,7 @@ class EventHandler: (member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id)) - member_initialized = self.check_member_state_in_topology( + member_initialized = self.is_member_initialized_in_topology( member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id @@ -243,7 +242,7 @@ class EventHandler: (member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id)) - member_initialized = self.check_member_state_in_topology( + member_initialized = self.is_member_initialized_in_topology( member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id @@ -260,7 +259,7 @@ class EventHandler: (member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id)) - member_initialized = self.check_member_state_in_topology( + member_initialized = self.is_member_initialized_in_topology( member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id @@ -278,7 +277,7 @@ class EventHandler: cluster_id_in_payload = Config.cluster_id member_id_in_payload = Config.member_id - member_initialized = self.check_member_state_in_topology(service_name_in_payload, cluster_id_in_payload, + member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) if not member_initialized: @@ -353,13 +352,13 @@ class EventHandler: def cleanup(self, event): self.__log.info("Executing cleaning up the data in the cartridge instance...") - cartridgeagentpublisher.publish_maintenance_mode_event() + publisher.publish_maintenance_mode_event() self.execute_event_extendables("clean", {}) self.__log.info("cleaning up finished in the cartridge instance...") self.__log.info("publishing ready to shutdown event...") - cartridgeagentpublisher.publish_instance_ready_to_shutdown_event() + publisher.publish_instance_ready_to_shutdown_event() def execute_event_extendables(self, event, input_values): """ Execute the extensions and plugins related to the event @@ -471,28 +470,16 @@ class EventHandler: self.__log.debug("Repo path returned : %r" % repo_path) return repo_path - def check_member_state_in_topology(self, service_name, cluster_id, member_id): - # TODO: refactor - topology = TopologyContext.get_topology() - service = topology.get_service(service_name) - if service is None: - self.__log.error("Service not found in topology [service] %s" % service_name) - return False - - cluster = service.get_cluster(cluster_id) - if cluster is None: - self.__log.error("Cluster id not found in topology [cluster] %s" % cluster_id) - return False + def is_member_initialized_in_topology(self, service_name, cluster_id, member_id): + if self.member_exists_in_topology(service_name, cluster_id, member_id): + topology = TopologyContext.get_topology() + service = topology.get_service(service_name) + cluster = service.get_cluster(cluster_id) + found_member = cluster.get_member(member_id) + if found_member.status == MemberStatus.Initialized: + return True - activated_member = cluster.get_member(member_id) - if activated_member is None: - self.__log.error("Member id not found in topology [member] %s" % member_id) - return False - - if activated_member.status != MemberStatus.Initialized: - return False - - return True + return False def member_exists_in_topology(self, service_name, cluster_id, member_id): topology = TopologyContext.get_topology() http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py index 0b58dbb..2e287bd 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py @@ -16,7 +16,7 @@ # under the License. import json -from ... tenant.tenantcontext import * +from entity import * class CompleteTenantEvent: http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/topology/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/topology/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/topology/events.py index 07151c2..e11daab 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/topology/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/topology/events.py @@ -17,7 +17,7 @@ import json -from ... topology.topologycontext import * +from entity import * class MemberActivatedEvent: http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py deleted file mode 100644 index 4891800..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py +++ /dev/null @@ -1,215 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import paho.mqtt.publish as publish - -from .. event.instance.status.events import * -from .. util.log import * -from .. util import cartridgeagentutils -import healthstats -import constants -from config import Config - - -log = LogFactory().get_log(__name__) - -started = False -activated = False -ready_to_shutdown = False -maintenance = False - -publishers = {} -""" :type : dict[str, EventPublisher] """ - - -def publish_instance_started_event(): - global started, log - if not started: - log.info("Publishing instance started event") - - application_id = Config.application_id - service_name = Config.service_name - cluster_id = Config.cluster_id - member_id = Config.member_id - instance_id = Config.instance_id - cluster_instance_id = Config.cluster_instance_id - network_partition_id = Config.network_partition_id - partition_id = Config.partition_id - - instance_started_event = InstanceStartedEvent( - application_id, - service_name, - cluster_id, - cluster_instance_id, - member_id, - instance_id, - network_partition_id, - partition_id) - - publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_STARTED_EVENT) - publisher.publish(instance_started_event) - started = True - log.info("Instance started event published") - else: - log.warn("Instance already started") - - -def publish_instance_activated_event(health_stat_plugin): - global activated, log - if not activated: - # Wait for all ports to be active - listen_address = Config.listen_address - configuration_ports = Config.ports - ports_active = cartridgeagentutils.wait_until_ports_active( - listen_address, - configuration_ports, - int(Config.read_property("port.check.timeout", critical=False))) - - if ports_active: - log.info("Publishing instance activated event") - service_name = Config.service_name - cluster_id = Config.cluster_id - member_id = Config.member_id - instance_id = Config.instance_id - cluster_instance_id = Config.cluster_instance_id - network_partition_id = Config.network_partition_id - partition_id = Config.partition_id - - instance_activated_event = InstanceActivatedEvent( - service_name, - cluster_id, - cluster_instance_id, - member_id, - instance_id, - network_partition_id, - partition_id) - - publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_ACTIVATED_EVENT) - publisher.publish(instance_activated_event) - - log.info("Instance activated event published") - log.info("Starting health statistics notifier") - - health_stat_publishing_enabled = Config.read_property(constants.CEP_PUBLISHER_ENABLED, False) - - if health_stat_publishing_enabled: - interval_default = 15 # seconds - interval = Config.read_property("stats.notifier.interval", False) - if interval is not None and len(interval) > 0: - try: - interval = int(interval) - except ValueError: - interval = interval_default - else: - interval = interval_default - - health_stats_publisher = healthstats.HealthStatisticsPublisherManager(interval, health_stat_plugin) - log.info("Starting Health statistics publisher with interval %r" % interval) - health_stats_publisher.start() - else: - log.warn("Statistics publisher is disabled") - - activated = True - log.info("Health statistics notifier started") - else: - log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s" - % (listen_address, configuration_ports)) - else: - log.warn("Instance already activated") - - -def publish_maintenance_mode_event(): - global maintenance, log - if not maintenance: - log.info("Publishing instance maintenance mode event") - - service_name = Config.service_name - cluster_id = Config.cluster_id - member_id = Config.member_id - instance_id = Config.instance_id - cluster_instance_id = Config.cluster_instance_id - network_partition_id = Config.network_partition_id - partition_id = Config.partition_id - - instance_maintenance_mode_event = InstanceMaintenanceModeEvent( - service_name, - cluster_id, - cluster_instance_id, - member_id, - instance_id, - network_partition_id, - partition_id) - - publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_MAINTENANCE_MODE_EVENT) - publisher.publish(instance_maintenance_mode_event) - - maintenance = True - log.info("Instance Maintenance mode event published") - else: - log.warn("Instance already in a Maintenance mode...") - - -def publish_instance_ready_to_shutdown_event(): - global ready_to_shutdown, log - if not ready_to_shutdown: - log.info("Publishing instance activated event") - - service_name = Config.service_name - cluster_id = Config.cluster_id - member_id = Config.member_id - instance_id = Config.instance_id - cluster_instance_id = Config.cluster_instance_id - network_partition_id = Config.network_partition_id - partition_id = Config.partition_id - - instance_shutdown_event = InstanceReadyToShutdownEvent( - service_name, - cluster_id, - cluster_instance_id, - member_id, - instance_id, - network_partition_id, - partition_id) - - publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_READY_TO_SHUTDOWN_EVENT) - publisher.publish(instance_shutdown_event) - - ready_to_shutdown = True - log.info("Instance ReadyToShutDown event published") - else: - log.warn("Instance already in a ReadyToShutDown event...") - - -def get_publisher(topic): - if topic not in publishers: - publishers[topic] = EventPublisher(topic) - - return publishers[topic] - - -class EventPublisher: - """ - Handles publishing events to topics to the provided message broker - """ - def __init__(self, topic): - self.__topic = topic - - def publish(self, event): - mb_ip = Config.read_property(constants.MB_IP) - mb_port = Config.read_property(constants.MB_PORT) - payload = event.to_json() - publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/__init__.py deleted file mode 100644 index 2456923..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/eventsubscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/eventsubscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/eventsubscriber.py deleted file mode 100644 index 4154d46..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/subscriber/eventsubscriber.py +++ /dev/null @@ -1,117 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from Queue import Queue - -import threading -import paho.mqtt.client as mqtt - - -class EventSubscriber(threading.Thread): - """ - Provides functionality to subscribe to a given topic on the Stratos MB and - register event handlers for various events. - """ - - def __init__(self, topic, ip, port): - threading.Thread.__init__(self) - - self.__event_queue = Queue(maxsize=0) - self.__event_executor = EventExecutor(self.__event_queue) - - self.log = LogFactory().get_log(__name__) - - self.__mb_client = None - self.__topic = topic - self.__subscribed = False - self.__ip = ip - self.__port = port - - def run(self): - # Start the event executor thread - self.__event_executor.start() - self.__mb_client = mqtt.Client() - self.__mb_client.on_connect = self.on_connect - self.__mb_client.on_message = self.on_message - - self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) - self.__mb_client.connect(self.__ip, self.__port, 60) - self.__subscribed = True - self.__mb_client.loop_forever() - - def register_handler(self, event, handler): - """ - Adds an event handler function mapped to the provided event. - :param str event: Name of the event to attach the provided handler - :param handler: The handler function - :return: void - :rtype: void - """ - self.__event_executor.register_event_handler(event, handler) - self.log.debug("Registered handler for event %r" % event) - - def on_connect(self, client, userdata, flags, rc): - self.log.debug("Connected to message broker.") - self.__mb_client.subscribe(self.__topic) - self.log.debug("Subscribed to %r" % self.__topic) - - def on_message(self, client, userdata, msg): - self.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload)) - self.__event_queue.put(msg) - - def is_subscribed(self): - """ - Checks if this event subscriber is successfully subscribed to the provided topic - :return: True if subscribed, False if otherwise - :rtype: bool - """ - return self.__subscribed - - -class EventExecutor(threading.Thread): - """ - Polls the event queue and executes event handlers for each event - """ - def __init__(self, event_queue): - threading.Thread.__init__(self) - self.__event_queue = event_queue - # TODO: several handlers for one event - self.__event_handlers = {} - self.log = LogFactory().get_log(__name__) - - def run(self): - while True: - event_msg = self.__event_queue.get() - event = event_msg.topic.rpartition('/')[2] - if event in self.__event_handlers: - handler = self.__event_handlers[event] - try: - self.log.debug("Executing handler for event %r" % event) - handler(event_msg) - except: - self.log.exception("Error processing %r event" % event) - else: - - self.log.debug("Event handler not found for event : %r" % event) - - def register_event_handler(self, event, handler): - self.__event_handlers[event] = handler - - def terminate(self): - self.terminate() - - -from .. util.log import LogFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/tenantcontext.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/tenantcontext.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/tenantcontext.py deleted file mode 100644 index 5ed9d4e..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/tenant/tenantcontext.py +++ /dev/null @@ -1,185 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -class Tenant: - """ - Object type representing the tenant details of a single tenant - """ - - def __init__(self, tenant_id, tenant_domain): - self.tenant_id = tenant_id - """ :type : int """ - self.tenant_domain = tenant_domain - """ :type : str """ - self.service_name_subscription_map = {} - """ :type : dict[str, Subscription] """ - - def get_subscription(self, service_name): - """ - Returns the Subscription object related to the provided service name - :param str service_name: service name to be retrieved - :return: Subscription of the service or None if the service name doesn't exist - :rtype: Subscription - """ - if service_name in self.service_name_subscription_map: - return self.service_name_subscription_map[service_name] - - return None - - def is_subscribed(self, service_name): - """ - Checks if the given service name has a subscription from this tenant - :param str service_name: name of the service to check - :return: True if the tenant is subscribed to the given service name, False if not - :rtype: bool - """ - return service_name in self.service_name_subscription_map - - def add_subscription(self, subscription): - """ - Adds a subscription information entry on the subscription list for this tenant - :param Subscription subscription: Subscription information to be added - :return: void - :rtype: void - """ - self.service_name_subscription_map[subscription.service_name] = subscription - - def remove_subscription(self, service_name): - """ - Removes the specified subscription details from the subscription list - :param str service_name: The service name of the subscription to be removed - :return: void - :rtype: void - """ - if service_name in self.service_name_subscription_map: - self.service_name_subscription_map.pop(service_name) - - -class Subscription: - """ - Subscription information of a particular subscription to a service - """ - - def __init__(self, service_name, cluster_ids): - self.service_name = service_name - """ :type : str """ - self.cluster_ids = cluster_ids - """ :type : list[str] """ - self.subscription_domain_map = {} - """ :type : dict[str, SubscriptionDomain] """ - - def add_subscription_domain(self, domain_name, application_context): - """ - Adds a subscription domain - :param str domain_name: - :param str application_context: - :return: void - :rtype: void - """ - self.subscription_domain_map[domain_name] = SubscriptionDomain(domain_name, application_context) - - def remove_subscription_domain(self, domain_name): - """ - Removes the subscription domain of the specified domain name - :param str domain_name: - :return: void - :rtype: void - """ - if domain_name in self.subscription_domain_map: - self.subscription_domain_map.pop(domain_name) - - def subscription_domain_exists(self, domain_name): - """ - Returns the SubscriptionDomain information of the specified domain name - :param str domain_name: - :return: SubscriptionDomain - :rtype: SubscriptionDomain - """ - return domain_name in self.subscription_domain_map - - def get_subscription_domains(self): - """ - Returns the list of subscription domains of this subscription - :return: List of SubscriptionDomain objects - :rtype: list[SubscriptionDomain] - """ - return self.subscription_domain_map.values() - - -class SubscriptionDomain: - """ - Represents a Subscription Domain - """ - - def __init__(self, domain_name, application_context): - self.domain_name = domain_name - """ :type : str """ - self.application_context = application_context - """ :type : str """ - - -class TenantContext: - """ - Handles and maintains a model of all the information related to tenants within this instance - """ - tenants = {} - initialized = False - tenant_domains = {"carbon.super": Tenant(-1234, "carbon.super")} - - @staticmethod - def add_tenant(tenant): - TenantContext.tenants[tenant.tenant_id] = tenant - TenantContext.tenant_domains[tenant.tenant_domain] = tenant - - @staticmethod - def remove_tenant(tenant_id): - if tenant_id in TenantContext.tenants: - tenant = TenantContext.get_tenant(tenant_id) - TenantContext.tenants.pop(tenant.tenant_id) - TenantContext.tenant_domains.pop(tenant.tenant_domain) - - @staticmethod - def update(tenants): - for tenant in tenants: - TenantContext.add_tenant(tenant) - - @staticmethod - def get_tenant(tenant_id): - """ - Gets the Tenant object of the provided tenant ID - :param int tenant_id: - :return: Tenant object of the provided tenant ID - :rtype: Tenant - """ - if tenant_id in TenantContext.tenants: - return TenantContext.tenants[tenant_id] - - return None - - @staticmethod - def get_tenant_by_domain(tenant_domain): - """ - Gets the Tenant object of the provided tenant domain - :param str tenant_domain: - :return: Tenant object of the provided tenant domain - :rtype: str - """ - if tenant_domain in TenantContext.tenant_domains: - return TenantContext.tenant_domains[tenant_domain] - - return None \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7f5ca0c1/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/topology/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License.
