http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/healthstatspublisher/healthstats.py b/tools/python-cartridge-agent/cartridgeagent/modules/healthstatspublisher/healthstats.py new file mode 100644 index 0000000..4ceb948 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/healthstatspublisher/healthstats.py @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from threading import Thread +import time +import psutil +import os + +from abstracthealthstatisticspublisher import * +from ..databridge.agent import * +from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration +from ..util import cartridgeagentutils, cartridgeagentconstants + + +class HealthStatisticsPublisherManager(Thread): + """ + Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and + load average and publishes them as ThriftEvents to a CEP server + """ + STREAM_NAME = "cartridge_agent_health_stats" + STREAM_VERSION = "1.0.0" + STREAM_NICKNAME = "agent health stats" + STREAM_DESCRIPTION = "agent health stats" + + def __init__(self, publish_interval): + """ + Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval + :param int publish_interval: Number of seconds as the interval + :return: void + """ + Thread.__init__(self) + + self.log = LogFactory().get_log(__name__) + + self.publish_interval = publish_interval + """:type : int""" + + self.terminated = False + + self.publisher = HealthStatisticsPublisher() + """:type : HealthStatisticsPublisher""" + # TODO: load plugins for the reader + self.stats_reader = DefaultHealthStatisticsReader() + """:type : AbstractHealthStatisticsReader""" + + def run(self): + while not self.terminated: + time.sleep(self.publish_interval) + + cartridge_stats = self.stats_reader.stat_cartridge_health() + self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage) + self.publisher.publish_memory_usage(cartridge_stats.memory_usage) + + self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) + self.publisher.publish_load_average(cartridge_stats.load_avg) + + self.publisher.publisher.disconnect() + + +class HealthStatisticsPublisher: + """ + Publishes memory usage and load average to thrift server + """ + log = LogFactory().get_log(__name__) + + def __init__(self): + + self.ports = [] + self.ports.append(CEPPublisherConfiguration.get_instance().server_port) + + self.cartridge_agent_config = CartridgeAgentConfiguration() + + cartridgeagentutils.wait_until_ports_active( + CEPPublisherConfiguration.get_instance().server_ip, + self.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) + cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) + if not cep_active: + raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") + + self.stream_definition = HealthStatisticsPublisher.create_stream_definition() + HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) + self.publisher = ThriftPublisher( + CEPPublisherConfiguration.get_instance().server_ip, + CEPPublisherConfiguration.get_instance().server_port, + CEPPublisherConfiguration.get_instance().admin_username, + CEPPublisherConfiguration.get_instance().admin_password, + self.stream_definition) + + HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") + + @staticmethod + def create_stream_definition(): + """ + Create a StreamDefinition for publishing to CEP + """ + stream_def = StreamDefinition() + stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME + stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION + stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME + stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION + + stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE) + + return stream_def + + def publish_memory_usage(self, memory_usage): + """ + Publishes the given memory usage value to the thrift server as a ThriftEvent + :param float memory_usage: memory usage + """ + + event = ThriftEvent() + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION) + event.payloadData.append(memory_usage) + + HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) + self.publisher.publish(event) + + def publish_load_average(self, load_avg): + """ + Publishes the given load average value to the thrift server as a ThriftEvent + :param float load_avg: load average value + """ + + event = ThriftEvent() + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE) + event.payloadData.append(load_avg) + + HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) + self.publisher.publish(event) + + +class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader): + """ + Default implementation of the AbstractHealthStatisticsReader + """ + + def __init__(self): + self.log = LogFactory().get_log(__name__) + + def stat_cartridge_health(self): + cartridge_stats = CartridgeHealthStatistics() + cartridge_stats.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage() + cartridge_stats.load_avg = DefaultHealthStatisticsReader.__read_load_avg() + + self.log.debug("Memory read: %r, CPU read: %r" % (cartridge_stats.memory_usage, cartridge_stats.load_avg)) + return cartridge_stats + + @staticmethod + def __read_mem_usage(): + return psutil.virtual_memory().percent + + @staticmethod + def __read_load_avg(): + (one, five, fifteen) = os.getloadavg() + return one + + +class CEPPublisherConfiguration: + """ + TODO: Extract common functionality + """ + + __instance = None + log = LogFactory().get_log(__name__) + + @staticmethod + def get_instance(): + """ + Singleton instance retriever + :return: Instance + :rtype : CEPPublisherConfiguration + """ + if CEPPublisherConfiguration.__instance is None: + CEPPublisherConfiguration.__instance = CEPPublisherConfiguration() + + return CEPPublisherConfiguration.__instance + + def __init__(self): + self.enabled = False + self.server_ip = None + self.server_port = None + self.admin_username = None + self.admin_password = None + self.cartridge_agent_config = CartridgeAgentConfiguration() + + self.read_config() + + def read_config(self): + self.enabled = True if self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False + if not self.enabled: + CEPPublisherConfiguration.log.info("CEP Publisher disabled") + return + + CEPPublisherConfiguration.log.info("CEP Publisher enabled") + + self.server_ip = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_RECEIVER_IP, False) + if self.server_ip is None or self.server_ip.strip() == "": + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_IP) + + self.server_port = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_RECEIVER_PORT, False) + if self.server_port is None or self.server_port.strip() == "": + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_PORT) + + self.admin_username = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME, False) + if self.admin_username is None or self.admin_username.strip() == "": + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME) + + self.admin_password = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD, False) + if self.admin_password is None or self.admin_password.strip() == "": + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD) + + CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized")
http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/publisher/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/publisher/__init__.py b/tools/python-cartridge-agent/cartridgeagent/modules/publisher/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/publisher/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py b/tools/python-cartridge-agent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py new file mode 100644 index 0000000..1ce8ffb --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging + +import paho.mqtt.publish as publish + +from .. event.instance.status.events import * +from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration +from .. util import cartridgeagentconstants +from .. healthstatspublisher.healthstats import * +from .. healthstatspublisher.abstracthealthstatisticspublisher import * + + +log = LogFactory().get_log(__name__) + +started = False +activated = False +ready_to_shutdown = False +maintenance = False + +publishers = {} +""" :type : dict[str, EventPublisher] """ + + +def publish_instance_started_event(): + global started, log + if not started: + log.info("Publishing instance started event") + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id + + instance_started_event = InstanceStartedEvent(service_name, cluster_id, network_partition_id, parition_id, + member_id) + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_STARTED_EVENT) + publisher.publish(instance_started_event) + started = True + log.info("Instance started event published") + else: + log.warn("Instance already started") + + +def publish_instance_activated_event(): + global activated, log + if not activated: + log.info("Publishing instance activated event") + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id + + instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, network_partition_id, parition_id, + member_id) + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_ACTIVATED_EVENT) + publisher.publish(instance_activated_event) + + log.info("Instance activated event published") + log.info("Starting health statistics notifier") + + if CEPPublisherConfiguration.get_instance().enabled: + interval_default = 15 # seconds + interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval", False) + if interval is not None and len(interval) > 0: + try: + interval = int(interval) + except ValueError: + interval = interval_default + else: + interval = interval_default + + health_stats_publisher = HealthStatisticsPublisherManager(interval) + log.info("Starting Health statistics publisher with interval %r" % interval_default) + health_stats_publisher.start() + else: + log.warn("Statistics publisher is disabled") + + activated = True + log.info("Health statistics notifier started") + else: + log.warn("Instance already activated") + + +def publish_maintenance_mode_event(): + global maintenance, log + if not maintenance: + log.info("Publishing instance maintenance mode event") + + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id + + instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, network_partition_id, parition_id, + member_id) + + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_MAINTENANCE_MODE_EVENT) + publisher.publish(instance_maintenance_mode_event) + + maintenance = True + log.info("Instance Maintenance mode event published") + else: + log.warn("Instance already in a Maintenance mode....") + + +def publish_instance_ready_to_shutdown_event(): + global ready_to_shutdown, log + if not ready_to_shutdown: + log.info("Publishing instance activated event") + + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id + + instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, network_partition_id, parition_id, + member_id) + + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_READY_TO_SHUTDOWN_EVENT) + publisher.publish(instance_shutdown_event) + + ready_to_shutdown = True + log.info("Instance ReadyToShutDown event published") + else: + log.warn("Instance already in a ReadyToShutDown event....") + + +def get_publisher(topic): + if topic not in publishers: + publishers[topic] = EventPublisher(topic) + + return publishers[topic] + + +class EventPublisher: + """ + Handles publishing events to topics to the provided message broker + """ + def __init__(self, topic): + self.__topic = topic + + def publish(self, event): + mb_ip = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP) + mb_port = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT) + payload = event.to_json() + publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/__init__.py b/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/eventsubscriber.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/eventsubscriber.py b/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/eventsubscriber.py new file mode 100644 index 0000000..bc026dd --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/subscriber/eventsubscriber.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import threading +import paho.mqtt.client as mqtt + + +class EventSubscriber(threading.Thread): + """ + Provides functionality to subscribe to a given topic on the stratos MB and + register event handlers for various events. + """ + + def __init__(self, topic, ip, port): + threading.Thread.__init__(self) + + #{"ArtifactUpdateEvent" : onArtifactUpdateEvent()} + self.__event_handlers = {} + + self.log = LogFactory().get_log(__name__) + + self.__mb_client = None + + self.__topic = topic + + self.__subscribed = False + + self.__ip = ip + self.__port = port + + def run(self): + self.__mb_client = mqtt.Client() + self.__mb_client.on_connect = self.on_connect + self.__mb_client.on_message = self.on_message + + self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) + self.__mb_client.connect(self.__ip, self.__port, 60) + self.__subscribed = True + self.__mb_client.loop_forever() + + def register_handler(self, event, handler): + """ + Adds an event handler function mapped to the provided event. + :param str event: Name of the event to attach the provided handler + :param handler: The handler function + :return: void + :rtype: void + """ + self.__event_handlers[event] = handler + self.log.debug("Registered handler for event %r" % event) + + def on_connect(self, client, userdata, flags, rc): + self.log.debug("Connected to message broker.") + self.__mb_client.subscribe(self.__topic) + self.log.debug("Subscribed to %r" % self.__topic) + + def on_message(self, client, userdata, msg): + self.log.debug("Message received: %r:\n%r" % (msg.topic, msg.payload)) + + event = msg.topic.rpartition('/')[2] + + if event in self.__event_handlers: + handler = self.__event_handlers[event] + + try: + self.log.debug("Executing handler for event %r" % event) + handler(msg) + except: + self.log.exception("Error processing %r event" % event) + else: + self.log.debug("Event handler not found for event : %r" % event) + + def is_subscribed(self): + """ + Checks if this event subscriber is successfully subscribed to the provided topic + :return: True if subscribed, False if otherwise + :rtype: bool + """ + return self.__subscribed + + +from .. util.log import LogFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/tenant/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/tenant/__init__.py b/tools/python-cartridge-agent/cartridgeagent/modules/tenant/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/tenant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/tenant/tenantcontext.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/tenant/tenantcontext.py b/tools/python-cartridge-agent/cartridgeagent/modules/tenant/tenantcontext.py new file mode 100644 index 0000000..202bd35 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/tenant/tenantcontext.py @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class Tenant: + """ + Object type representing the tenant details of a single tenant + """ + + def __init__(self, tenant_id, tenant_domain): + self.tenant_id = tenant_id + """ :type : int """ + self.tenant_domain = tenant_domain + """ :type : str """ + self.service_name_subscription_map = {} + """ :type : dict[str, Subscription] """ + + def get_subscription(self, service_name): + """ + Returns the Subscription object related to the provided service name + :param str service_name: service name to be retrieved + :return: Subscription of the service or None if the service name doesn't exist + :rtype: Subscription + """ + if service_name in self.service_name_subscription_map: + return self.service_name_subscription_map[service_name] + + return None + + def is_subscribed(self, service_name): + """ + Checks if the given service name has a subscription from this tenant + :param str service_name: name of the service to check + :return: True if the tenant is subscribed to the given service name, False if not + :rtype: bool + """ + return service_name in self.service_name_subscription_map + + def add_subscription(self, subscription): + """ + Adds a subscription information entry on the subscription list for this tenant + :param Subscription subscription: Subscription information to be added + :return: void + :rtype: void + """ + self.service_name_subscription_map[subscription.service_name] = subscription + + def remove_subscription(self, service_name): + """ + Removes the specified subscription details from the subscription list + :param str service_name: The service name of the subscription to be removed + :return: void + :rtype: void + """ + if service_name in self.service_name_subscription_map: + self.service_name_subscription_map.pop(service_name) + + +class Subscription: + """ + Subscription information of a particular subscription to a service + """ + + def __init__(self, service_name, cluster_ids): + self.service_name = service_name + """ :type : str """ + self.cluster_ids = cluster_ids + """ :type : list[str] """ + self.subscription_domain_map = {} + """ :type : dict[str, SubscriptionDomain] """ + + def add_subscription_domain(self, domain_name, application_context): + """ + Adds a subscription domain + :param str domain_name: + :param str application_context: + :return: void + :rtype: void + """ + self.subscription_domain_map[domain_name] = SubscriptionDomain(domain_name, application_context) + + def remove_subscription_domain(self, domain_name): + """ + Removes the subscription domain of the specified domain name + :param str domain_name: + :return: void + :rtype: void + """ + if domain_name in self.subscription_domain_map: + self.subscription_domain_map.pop(domain_name) + + def subscription_domain_exists(self, domain_name): + """ + Returns the SubscriptionDomain information of the specified domain name + :param str domain_name: + :return: SubscriptionDomain + :rtype: SubscriptionDomain + """ + return domain_name in self.subscription_domain_map + + def get_subscription_domains(self): + """ + Returns the list of subscription domains of this subscription + :return: List of SubscriptionDomain objects + :rtype: list[SubscriptionDomain] + """ + return self.subscription_domain_map.values() + + +class SubscriptionDomain: + """ + Represents a Subscription Domain + """ + + def __init__(self, domain_name, application_context): + self.domain_name = domain_name + """ :type : str """ + self.application_context = application_context + """ :type : str """ + + +class TenantContext: + """ + Handles and maintains a model of all the information related to tenants within this instance + """ + tenants = {} + initialized = False + tenant_domains = {"carbon.super": Tenant(-1234, "carbon.super")} + + @staticmethod + def add_tenant(tenant): + TenantContext.tenants[tenant.tenant_id] = tenant + TenantContext.tenant_domains[tenant.tenant_domain] = tenant + + @staticmethod + def remove_tenant(tenant_id): + if tenant_id in TenantContext.tenants: + tenant = TenantContext.get_tenant(tenant_id) + TenantContext.tenants.pop(tenant.tenant_id) + TenantContext.tenant_domains.pop(tenant.tenant_domain) + + @staticmethod + def update(tenants): + for tenant in tenants: + TenantContext.add_tenant(tenant) + + @staticmethod + def get_tenant(tenant_id): + """ + Gets the Tenant object of the provided tenant ID + :param int tenant_id: + :return: Tenant object of the provided tenant ID + :rtype: Tenant + """ + if tenant_id in TenantContext.tenants: + return TenantContext.tenants[tenant_id] + + return None + + @staticmethod + def get_tenant_by_domain(tenant_domain): + """ + Gets the Tenant object of the provided tenant domain + :param str tenant_domain: + :return: Tenant object of the provided tenant domain + :rtype: str + """ + if tenant_domain in TenantContext.tenant_domains: + return TenantContext.tenant_domains[tenant_domain] + + return None \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/topology/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/topology/__init__.py b/tools/python-cartridge-agent/cartridgeagent/modules/topology/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/topology/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/topology/topologycontext.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/topology/topologycontext.py b/tools/python-cartridge-agent/cartridgeagent/modules/topology/topologycontext.py new file mode 100644 index 0000000..5fe2ea4 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/topology/topologycontext.py @@ -0,0 +1,454 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ..util import cartridgeagentconstants + + +class Topology: + """ + Represents the topology provided by the Cloud Controller + """ + + def __init__(self): + self.service_map = {} + """ :type : dict[str, Service] """ + self.initialized = False + """ :type : bool """ + self.json_str = None + """ :type : str """ + + def get_services(self): + """ + Provides the list of services on the topology + :return: The list of Service objects + :rtype: list[Service] + """ + return self.service_map.values() + + def get_service(self, service_name): + """ + Provides the service information for the given service name + :param str service_name: service name to be retrieved + :return: Service object of the service, None if the provided service name is invalid + :rtype: Service + """ + if service_name in self.service_map: + return self.service_map[service_name] + + return None + + def add_service(self, service): + """ + Adds a service to the list of services on the topology + + :param Service service: + :return: void + """ + self.service_map[service.service_name] = service + + def add_services(self, services): + """ + + :param list[Service] services: + :return: void + """ + for service in services: + self.add_service(service) + + def remove_service(self, service_name): + """ + Removes the service of the provided service name + :param str service_name: + :return: void + """ + if service_name in self.service_map: + self.service_map.pop(service_name) + + def service_exists(self, service_name): + """ + Checks if the service of the provided service name exists + :param str service_name: + :return: True if the service exists, False if otherwise + :rtype: bool + """ + return service_name in self.service_map + + def clear(self): + """ + Clears the service information list + :return: void + """ + self.service_map = {} + + def __str__(self): + """ + to string override + :return: + """ + return "Topology [serviceMap= %r , initialized= %r ]" % (self.service_map, self.initialized) + + +class Service: + """ + Represents a service on the topology + """ + + def __init__(self, service_name, service_type): + self.service_name = service_name + """ :type : str """ + self.service_type = service_type + """ :type : str """ + self.cluster_id_cluster_map = {} + """ :type : dict[str, Cluster] """ + self.port_map = {} + """ :type : dict[str, Port] """ + self.properties = {} + """ :type : dict[str, str] """ + + def get_clusters(self): + """ + Provides the list of clusters in the particular service + :return: The list of Cluster objects + :rtype: list[Cluster] + """ + return self.cluster_id_cluster_map.values() + + def add_cluster(self, cluster): + """ + Adds a cluster to the service + :param Cluster cluster: the cluster to be added + :return: void + """ + self.cluster_id_cluster_map[cluster.cluster_id] = cluster + + def remove_cluster(self, cluster_id): + if cluster_id in self.cluster_id_cluster_map: + self.cluster_id_cluster_map.pop(cluster_id) + + def cluster_exists(self, cluster_id): + """ + Checks if the cluster with the given cluster id exists for ther service + :param str cluster_id: + :return: True if the cluster for the given cluster id exists, False if otherwise + :rtype: bool + """ + return cluster_id in self.cluster_id_cluster_map + + def get_cluster(self, cluster_id): + """ + Provides the Cluster information for the provided cluster id + :param str cluster_id: the cluster id to search for + :return: Cluster object for the given cluster id, None if the cluster id is invalid + :rtype: Cluster + """ + if cluster_id in self.cluster_id_cluster_map: + return self.cluster_id_cluster_map[cluster_id] + + return None + + def get_ports(self): + """ + Returns the list of ports in the particular service + :return: The list of Port object + :rtype: list[Port] + """ + return self.port_map.values() + + def get_port(self, proxy_port): + """ + Provides the port information for the provided proxy port + :param str proxy_port: + :return: Port object for the provided port, None if port is invalid + :rtype: Port + """ + if proxy_port in self.port_map: + return self.port_map[proxy_port] + + return None + + def add_port(self, port): + self.port_map[port.proxy] = port + + def add_ports(self, ports): + for port in ports: + self.add_port(port) + + +class Cluster: + """ + Represents a cluster for a service + """ + + def __init__(self, service_name="", cluster_id="", deployment_policy_name="", autoscale_policy_name=""): + self.service_name = service_name + """ :type : str """ + self.cluster_id = cluster_id + """ :type : str """ + self.deployment_policy_name = deployment_policy_name + """ :type : str """ + self.autoscale_policy_name = autoscale_policy_name + """ :type : str """ + self.hostnames = [] + """ :type : list[str] """ + self.member_map = {} + """ :type : dict[str, Member] """ + + self.tenant_range = None + """ :type : str """ + self.is_lb_cluster = False + """ :type : bool """ + self.is_kubernetes_cluster = False + """ :type : bool """ + self.status = None + """ :type : str """ + self.load_balancer_algorithm_name = None + """ :type : str """ + self.properties = {} + """ :type : dict[str, str] """ + self.member_list_json = None + """ :type : str """ + + def add_hostname(self, hostname): + self.hostnames.append(hostname) + + def set_tenant_range(self, tenant_range): + self.validate_tenant_range(tenant_range) + self.tenant_range = tenant_range + + def get_members(self): + """ + Provides the list of member information in the cluster + :return: The list of Member object + :rtype: list[Member] + """ + return self.member_map.values() + + def add_member(self, member): + self.member_map[member.member_id] = member + + def remove_member(self, member_id): + if self.member_exists(member_id): + self.member_map.pop(member_id) + + def get_member(self, member_id): + """ + Provides the member information for the provided member id + :param str member_id: + :return: Member object for the provided member id, None if member id is invalid + :rtype: Member + """ + if self.member_exists(member_id): + return self.member_map[member_id] + + return None + + def member_exists(self, member_id): + """ + Checks if the member for the provided member id exists in this cluster + :param str member_id: member id to be searched + :return: True if the member exists, False if otherwise + :rtype: bool + """ + return member_id in self.member_map + + def __str__(self): + return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \ + + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \ + + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \ + + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]" + + def tenant_id_in_range(self, tenant_id): + """ + Check whether a given tenant id is in tenant range of the cluster. + :param str tenant_id: tenant id to be checked + :return: True if the tenant id is in tenant id range, False if otherwise + :rtype: bool + """ + if self.tenant_range is None: + return False + + if self.tenant_range == "*": + return True + else: + arr = self.tenant_range.split(cartridgeagentconstants.TENANT_RANGE_DELIMITER) + tenant_start = int(arr[0]) + if tenant_start <= tenant_id: + tenant_end = arr[1] + if tenant_end == "*": + return True + else: + if tenant_id <= int(tenant_end): + return True + + return False + + def validate_tenant_range(self, tenant_range): + """ + Validates the tenant range to be either '*' or a delimeted range of numbers + :param str tenant_range: The tenant range string to be validated + :return: void if the provided tenant range is valid, RuntimeError if otherwise + :exception: RuntimeError if the tenant range is invalid + """ + valid = False + if tenant_range == "*": + valid = True + else: + arr = tenant_range.split(cartridgeagentconstants.TENANT_RANGE_DELIMITER) + if len(arr) == 2: + if arr[0].isdigit() and arr[1].isdigit(): + valid = True + elif arr[0].isdigit() and arr[1] == "*": + valid = True + + if not valid: + raise RuntimeError("Tenant range %r is not valid" % tenant_range) + + +class Member: + """ + Represents a member on a particular cluster + """ + + def __init__(self, service_name="", cluster_id="", network_partition_id="", parition_id="", member_id=""): + self.service_name = service_name + """ :type : str """ + self.cluster_id = cluster_id + """ :type : str """ + self.network_partition_id = network_partition_id + """ :type : str """ + self.partition_id = parition_id + """ :type : str """ + self.member_id = member_id + """ :type : str """ + self.port_map = {} + """ :type : dict[str, Port] """ + + self.member_public_ip = None + """ :type : str """ + self.status = None + """ :type : str """ + self.member_ip = None + """ :type : str """ + self.properties = {} + """ :type : dict[str, str] """ + self.lb_cluster_id = None + """ :type : str """ + self.json_str = None + """ :type : str """ + + def is_active(self): + """ + Checks if the member is in active state + :return: True if active, False if otherwise + :rtype: bool + """ + return self.status == MemberStatus.Activated + + def get_ports(self): + """ + Provides the list of the ports in the member + :return: List of Port objects + :rtype: list[Port] + """ + return self.port_map.values() + + def get_port(self, proxy): + """ + Provides the port information for the given port id + :param str proxy: The port id + :return: Port object of the provided port id, None if otherwise + :rtype: Port + """ + if proxy in self.port_map: + return self.port_map[proxy] + + return None + + def add_port(self, port): + self.port_map[port.proxy] = port + + def add_ports(self, ports): + for port in ports: + self.add_port(port) + + +class Port: + """ + Represents a port on a particular member + """ + + def __init__(self, protocol, value, proxy): + self.protocol = protocol + """ :type : str """ + self.value = value + """ :type : str """ + self.proxy = proxy + """ :type : str """ + + def __str__(self): + return "Port [protocol=%r, value=%r proxy=%r]" % (self.protocol, self.value, self.proxy) + + +class ServiceType: + """ + ServiceType enum + """ + SingleTenant = 1 + MultiTenant = 2 + + +class ClusterStatus: + """ + ClusterStatus enum + """ + Created = 1 + In_Maintenance = 2 + Removed = 3 + + +class MemberStatus: + """ + MemberStatus enum + """ + Created = 1 + Starting = 2 + Activated = 3 + In_Maintenance = 4 + ReadyToShutDown = 5 + Terminated = 6 + Suspended = 0 + ShuttingDown = 0 + + +class TopologyContext: + """ + Handles and maintains a model of the topology provided by the Cloud Controller + """ + topology = None + # TODO: read write locks, Lock() and RLock() + + @staticmethod + def get_topology(): + #TODO: thread-safety missing + if TopologyContext.topology is None: + TopologyContext.topology = Topology() + return TopologyContext.topology + + @staticmethod + def update(topology): + TopologyContext.topology = topology + TopologyContext.topology.initialized = True \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/__init__.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/asyncscheduledtask.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/asyncscheduledtask.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/asyncscheduledtask.py new file mode 100644 index 0000000..9fdde1e --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/asyncscheduledtask.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +from threading import Thread + + +class AsyncScheduledTask(Thread): + """ + Executes a given task with a given interval until being terminated + """ + + def __init__(self, delay, task): + Thread.__init__(self) + self.delay = delay + """ :type : int """ + self.task = task + """ :type : Thread """ + self.terminated = False + """ :type : bool """ + + def run(self): + """ + Start the scheuled task with a sleep time of delay in between + :return: + """ + while not self.terminated: + time.sleep(self.delay) + self.task.start() + + def terminate(self): + """ + Terminate the scheduled task. Allow a maximum of 'delay' seconds to be terminated. + :return: void + """ + self.terminated = True \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentconstants.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentconstants.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentconstants.py new file mode 100644 index 0000000..70afb30 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentconstants.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +PARAM_FILE_PATH = "param.file.path" +EXTENSIONS_DIR = "extensions.dir" + +MB_IP = "mb.ip" +MB_PORT = "mb.port" + +CARTRIDGE_KEY = "CARTRIDGE_KEY" +APP_PATH = "APP_PATH" +SERVICE_GROUP = "SERIVCE_GROUP" +SERVICE_NAME = "SERVICE_NAME" +CLUSTER_ID = "CLUSTER_ID" +LB_CLUSTER_ID = "LB_CLUSTER_ID" +NETWORK_PARTITION_ID = "NETWORK_PARTITION_ID" +PARTITION_ID = "PARTITION_ID" +MEMBER_ID = "MEMBER_ID" +TENANT_ID = "TENANT_ID" +REPO_URL = "REPO_URL" +PORTS = "PORTS" +DEPLOYMENT = "DEPLOYMENT" +MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE" +WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE" +PERSISTENCE_MAPPING = "PERSISTENCE_MAPPING" + +# stratos.sh environment variables keys +LOG_FILE_PATHS = "LOG_FILE_PATHS" +MEMORY_CONSUMPTION = "memory_consumption" +LOAD_AVERAGE = "load_average" +PORTS_NOT_OPEN = "ports_not_open" +MULTITENANT = "MULTITENANT" +CLUSTERING = "CLUSTERING" +MIN_INSTANCE_COUNT = "MIN_COUNT" +ENABLE_ARTIFACT_UPDATE = "enable.artifact.update" +ARTIFACT_UPDATE_INTERVAL = "artifact.update.interval" +COMMIT_ENABLED = "COMMIT_ENABLED" +AUTO_COMMIT = "auto.commit" +AUTO_CHECKOUT = "auto.checkout" +LISTEN_ADDRESS = "listen.address" +PROVIDER = "PROVIDER" +INTERNAL = "internal" +LB_PRIVATE_IP = "lb.private.ip" +LB_PUBLIC_IP = "lb.public.ip" + +# stratos.sh extension points shell scripts names keys +INSTANCE_STARTED_SCRIPT = "extension.instance.started" +START_SERVERS_SCRIPT = "extension.start.servers" +INSTANCE_ACTIVATED_SCRIPT = "extension.instance.activated" +ARTIFACTS_UPDATED_SCRIPT = "extension.artifacts.updated" +CLEAN_UP_SCRIPT = "extension.clean" +MOUNT_VOLUMES_SCRIPT = "extension.mount.volumes" +MEMBER_ACTIVATED_SCRIPT = "extension.member.activated" +MEMBER_TERMINATED_SCRIPT = "extension.member.terminated" +MEMBER_SUSPENDED_SCRIPT = "extension.member.suspended" +MEMBER_STARTED_SCRIPT = "extension.member.started" +COMPLETE_TOPOLOGY_SCRIPT = "extension.complete.topology" +COMPLETE_TENANT_SCRIPT = "extension.complete.tenant" +SUBSCRIPTION_DOMAIN_ADDED_SCRIPT = "extension.subscription.domain.added" +SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT = "extension.subscription.domain.removed" +ARTIFACTS_COPY_SCRIPT = "extension.artifacts.copy" +TENANT_SUBSCRIBED_SCRIPT = "extension.tenant.subscribed" +TENANT_UNSUBSCRIBED_SCRIPT = "extension.tenant.unsubscribed" + +SERVICE_GROUP_TOPOLOGY_KEY = "payload_parameter.SERIVCE_GROUP" +CLUSTERING_TOPOLOGY_KEY = "payload_parameter.CLUSTERING" +CLUSTERING_PRIMARY_KEY = "PRIMARY" + +SUPERTENANT_TEMP_PATH = "/tmp/-1234/" + +DEPLOYMENT_MANAGER = "manager" +DEPLOYMENT_WORKER = "worker" +DEPLOYMENT_DEFAULT = "default" +SUPER_TENANT_REPO_PATH = "super.tenant.repository.path" +TENANT_REPO_PATH = "tenant.repository.path" + +# topic names to subscribe +INSTANCE_NOTIFIER_TOPIC = "instance/#" +HEALTH_STAT_TOPIC = "health/#" +TOPOLOGY_TOPIC = "topology/#" +TENANT_TOPIC = "tenant/#" +INSTANCE_STATUS_TOPIC = "instance/status/" + +#Messaging Model +TENANT_RANGE_DELIMITER = "-" + +INSTANCE_STARTED_EVENT = "InstanceStartedEvent" +INSTANCE_ACTIVATED_EVENT = "InstanceActivatedEvent" +INSTANCE_MAINTENANCE_MODE_EVENT = "InstanceMaintenanceModeEvent" +INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent" + +PUBLISHER_SERVICE_NAME = "publisher" +APISTORE_SERVICE_NAME = "apistore" +APIMANAGER_SERVICE_NAME = "apim" +GATEWAY_SERVICE_NAME = "gatewaymgt" +GATEWAY_MGT_SERVICE_NAME = "gateway" +KEY_MANAGER_SERVICE_NAME = "keymanager" + +PRIMARY = "PRIMARY" +MIN_COUNT = "MIN_COUNT" + +#multi tenant constants +INVALID_TENANT_ID = "-1" +SUPER_TENANT_ID = "-1234" + +DATE_FORMAT = "%Y.%m.%d" + +PORT_CHECK_TIMEOUT = "port.check.timeout" + +CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled" +CEP_RECEIVER_IP = "thrift.receiver.ip" +CEP_RECEIVER_PORT = "thrift.receiver.port" +CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username" +CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password" + +MONITORING_PUBLISHER_ENABLED = "enable.data.publisher" +MONITORING_RECEIVER_IP = "monitoring.server.ip" +MONITORING_RECEIVER_PORT = "monitoring.server.port" +MONITORING_RECEIVER_SECURE_PORT = "monitoring.server.secure.port" +MONITORING_SERVER_ADMIN_USERNAME = "monitoring.server.admin.username" +MONITORING_SERVER_ADMIN_PASSWORD = "monitoring.server.admin.password" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentutils.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentutils.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentutils.py new file mode 100644 index 0000000..42a7dd5 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/cartridgeagentutils.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from Crypto.Cipher import AES +import base64 +import os +import time +import socket +import shutil + +from log import LogFactory + +unpad = lambda s: s[0:-ord(s[-1])] + +log = LogFactory().get_log(__name__) + +current_milli_time = lambda: int(round(time.time() * 1000)) + + +def decrypt_password(pass_str, secret): + """ + Decrypts the given password using the given secret. The encryption is assumed to be done + without IV, in AES. + :param str pass_str: Encrypted password string in Base64 encoding + :param str secret: The secret string + :return: The decrypted password + :rtype: str + """ + + if pass_str is None or pass_str.strip() == "": + return pass_str.strip() + + dec_pass = "" + + try: + log.debug("Decrypting password") + bdecoded_pass = base64.b64decode(pass_str) + #secret length should be 16 + cipher = AES.new(secret, AES.MODE_ECB) + dec_pass = unpad(cipher.decrypt(bdecoded_pass)) + except: + log.exception("Exception occurred while decrypting password") + + log.debug("Decrypted PWD: [%r]" % dec_pass) + return dec_pass + + +def create_dir(path): + """ + mkdir the provided path + :param path: The path to the directory to be made + :return: True if mkdir was successful, False if dir already exists + :rtype: bool + """ + try: + os.mkdir(path) + log.info("Successfully created directory [%r]" % path) + return True + except OSError: + log.exception("Directory creating failed in [%r]. Directory already exists. " % path) + + return False + + +def delete_folder_tree(path): + """ + Completely deletes the provided folder + :param str path: Full path of the folder + :return: void + """ + try: + shutil.rmtree(path) + log.debug("Directory [%r] deleted." % path) + except OSError: + log.exception("Deletion of folder path %r failed." % path) + + +def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000): + """ + Blocks until the given list of ports become active + :param str ip_address: Ip address of the member to be checked + :param list[str] ports: List of ports to be checked + :param int ports_check_timeout: The timeout in milliseconds, defaults to 1000*60*10 + :return: void + """ + if ports_check_timeout is None: + ports_check_timeout = 1000 * 60 * 10 + + log.debug("Port check timeout: %r" % ports_check_timeout) + + active = False + start_time = current_milli_time() + while not active: + log.info("Waiting for ports to be active: [ip] %r [ports] %r" % (ip_address, ports)) + active = check_ports_active(ip_address, ports) + end_time = current_milli_time() + duration = end_time - start_time + + if duration > ports_check_timeout: + return + + time.sleep(5) + + log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports)) + + +def check_ports_active(ip_address, ports): + """ + Checks the given list of port addresses for active state + :param str ip_address: Ip address of the member to be checked + :param list[str] ports: The list of ports to be checked + :return: True if the ports are active, False if at least one is not active + :rtype: bool + """ + if len(ports) < 1: + raise RuntimeError("No ports found") + + for port in ports: + s = socket.socket() + s.settimeout(5) + try: + s.connect((ip_address, int(port))) + log.debug("Port %r is active" % port) + s.close() + except socket.error: + log.debug("Print %r is not active" % port) + return False + + return True + + +def get_carbon_server_property(property_key): + """ + Reads the carbon.xml file and returns the value for the property key. + TODO: Get carbon server xml location + :param str property_key: Property key to look for + :return: The value of the property, None if the property key is invalid or not present + :rtype : str + """ + + raise NotImplementedError + + +def get_working_dir(): + """ + Returns the base directory of the cartridge agent. + :return: Base working dir path + :rtype : str + """ + #"/path/to/cartridge-agent/modules/util/".split("modules") returns ["/path/to/cartridge-agent/", "/util"] + return os.path.abspath(os.path.dirname(__file__)).split("modules")[0] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/extensionutils.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/extensionutils.py new file mode 100644 index 0000000..6c58852 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/extensionutils.py @@ -0,0 +1,494 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +import subprocess +import time + +from log import LogFactory +from .. config import cartridgeagentconfiguration + + +log = LogFactory().get_log(__name__) + +cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration() + + +def execute_copy_artifact_extension(source, destination): + try: + log.debug("Executing artifacts copy extension") + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False) + command = prepare_command(script_name) + + output, errors = execute_command(command + " " + source + " " + destination) + log.debug("Artifacts copy script returned: %r" % output) + except: + log.exception("Could not execute artifacts copy extension") + + +def execute_instance_started_extension(env_params): + try: + log.debug("Executing instance started extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Instance started script returned: %r" % output) + except: + log.exception("Could not execute instance started extension") + + +def execute_instance_activated_extension(): + try: + log.debug("Executing instance activated extension") + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False) + command = prepare_command(script_name) + + output, errors = execute_command(command) + log.debug("Instance activated script returned: %r" % output) + except: + log.exception("Could not execute instance activated extension") + + +def execute_artifacts_updated_extension(env_params): + try: + log.debug("Executing artifacts updated extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Artifacts updated script returned: %r" % output) + except: + log.exception("Could not execute artifacts updated extension") + + +def execute_subscription_domain_added_extension(env_params): + try: + log.debug("Executing subscription domain added extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Subscription domain added script returned: %r" % output) + except: + log.exception("Could not execute subscription domain added extension") + + +def execute_subscription_domain_removed_extension(env_params): + try: + log.debug("Executing subscription domain removed extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Subscription domain removed script returned: %r" % output) + except: + log.exception("Could not execute subscription domain removed extension") + + +def execute_start_servers_extension(env_params): + try: + log.debug("Executing start servers extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.START_SERVERS_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Start servers script returned: %r" % output) + except: + log.exception("Could not execute start servers extension") + + +def execute_complete_topology_extension(env_params): + try: + log.debug("Executing complete topology extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Complete topology script returned: %r" % output) + except: + log.exception("Could not execute complete topology extension") + + +def execute_complete_tenant_extension(env_params): + try: + log.debug("Executing complete tenant extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Complete tenant script returned: %r" % output) + except: + log.exception("Could not execute complete tenant extension") + + +def execute_tenant_subscribed_extension(env_params): + try: + log.debug("Executing tenant subscribed extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Tenant subscribed script returned: %r" % output) + except: + log.exception("Could not execute tenant subscribed extension") + + +def execute_tenant_unsubscribed_extension(env_params): + try: + log.debug("Executing tenant unsubscribed extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Tenant unsubscribed script returned: %r" % output) + except: + log.exception("Could not execute tenant unsubscribed extension") + + +def execute_member_terminated_extension(env_params): + try: + log.debug("Executing member terminated extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Member terminated script returned: %r" % output) + except: + log.exception("Could not execute member terminated extension") + + +def execute_member_suspended_extension(env_params): + try: + log.debug("Executing member suspended extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Member suspended script returned: %r" % output) + except: + log.exception("Could not execute member suspended extension") + + +def execute_member_started_extension(env_params): + try: + log.debug("Executing member started extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Member started script returned: %r" % output) + except: + log.exception("Could not execute member started extension") + + +def wait_for_complete_topology(): + while not TopologyContext.topology.initialized: + log.info("Waiting for complete topology event...") + time.sleep(5) + + +def check_topology_consistency(service_name, cluster_id, member_id): + topology = TopologyContext.get_topology() + service = topology.get_service(service_name) + if service is None: + log.error("Service not found in topology [service] %r" % service_name) + return False + + cluster = service.get_cluster(cluster_id) + if cluster is None: + log.error("Cluster id not found in topology [cluster] %r" % cluster_id) + return False + + activated_member = cluster.get_member(member_id) + if activated_member is None: + log.error("Member id not found in topology [member] %r" % member_id) + return False + + return True + + +def is_relevant_member_event(service_name, cluster_id, lb_cluster_id): + cluster_id_in_payload = cartridge_agent_config.cluster_id + if cluster_id_in_payload is None: + return False + + topology = TopologyContext.get_topology() + if topology is None or not topology.initialized: + return False + + if cluster_id_in_payload == cluster_id: + return True + + if cluster_id_in_payload == lb_cluster_id: + return True + + service_group_in_payload = cartridge_agent_config.service_group + if service_group_in_payload is not None: + service_properties = topology.get_service(service_name).properties + if service_properties is None: + return False + + member_service_group = service_properties[cartridgeagentconstants.SERVICE_GROUP_TOPOLOGY_KEY] + if member_service_group is not None and member_service_group == service_group_in_payload: + if service_name == cartridge_agent_config.service_name: + log.debug("Service names are same") + return True + elif cartridgeagentconstants.APISTORE_SERVICE_NAME == \ + cartridge_agent_config.service_name \ + and service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME: + log.debug("Service name in payload is [store]. Serivce name in event is [%r] " % service_name) + return True + elif cartridgeagentconstants.PUBLISHER_SERVICE_NAME == \ + cartridge_agent_config.service_name \ + and service_name == cartridgeagentconstants.APISTORE_SERVICE_NAME: + log.debug("Service name in payload is [publisher]. Serivce name in event is [%r] " % service_name) + return True + elif cartridgeagentconstants.DEPLOYMENT_WORKER == \ + cartridge_agent_config.deployment \ + and service_name == cartridge_agent_config.manager_service_name: + log.debug("Deployment is worker. Worker's manager service name & service name in event are same") + return True + elif cartridgeagentconstants.DEPLOYMENT_MANAGER == \ + cartridge_agent_config.deployment \ + and service_name == cartridge_agent_config.worker_service_name: + log.debug("Deployment is manager. Manager's worker service name & service name in event are same") + return True + + return False + + +def execute_volume_mount_extension(persistance_mappings_payload): + try: + log.debug("Executing volume mounting extension: [payload] %r" % persistance_mappings_payload) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False) + command = prepare_command(script_name) + + output, errors = execute_command(command + " " + persistance_mappings_payload) + log.debug("Volume mount script returned: %r" % output) + except: + log.exception("Could not execute Volume mount extension") + + +def execute_cleanup_extension(): + try: + log.debug("Executing cleanup extension") + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.CLEAN_UP_SCRIPT, False) + command = prepare_command(script_name) + + output, errors = execute_command(command) + log.debug("Cleanup script returned: %r" % output) + except: + log.exception("Could not execute Cleanup extension") + + +def execute_member_activated_extension(env_params): + try: + log.debug("Executing member activated extension") + + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False) + command = prepare_command(script_name) + env_params = add_payload_parameters(env_params) + env_params = clean_process_parameters(env_params) + + output, errors = execute_command(command, env_params) + log.debug("Member activated script returned: %r" % output) + except: + log.exception("Could not execute member activated extension") + + +def prepare_command(script_name): + extensions_dir = cartridge_agent_config.read_property( + cartridgeagentconstants.EXTENSIONS_DIR, False) + if extensions_dir.strip() == "": + raise RuntimeError("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR) + + file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") \ + else extensions_dir + "/" + script_name + + if os.path.isfile(file_path): + return file_path + + raise IOError("Script file not found : %r" % file_path) + + +def clean_process_parameters(params): + """ + Removes any null valued parameters before passing them to the extension scripts + :param dict params: + :return: cleaned parameters + :rtype: dict + """ + for key, value in params.items(): + if value is None: + del params[key] + + return params + + +def add_payload_parameters(env_params): + """ + Adds the common parameters to be used by the extension scripts + :param dict[str, str] env_params: Dictionary to be added + :return: Dictionary with updated parameters + :rtype: dict[str, str] + """ + env_params["STRATOS_APP_PATH"] = cartridge_agent_config.app_path + env_params["STRATOS_PARAM_FILE_PATH"] = cartridge_agent_config.read_property( + cartridgeagentconstants.PARAM_FILE_PATH, False) + env_params["STRATOS_SERVICE_NAME"] = cartridge_agent_config.service_name + env_params["STRATOS_TENANT_ID"] = cartridge_agent_config.tenant_id + env_params["STRATOS_CARTRIDGE_KEY"] = cartridge_agent_config.cartridge_key + env_params["STRATOS_LB_CLUSTER_ID"] = cartridge_agent_config.lb_cluster_id + env_params["STRATOS_CLUSTER_ID"] = cartridge_agent_config.cluster_id + env_params["STRATOS_NETWORK_PARTITION_ID"] = \ + cartridge_agent_config.network_partition_id + env_params["STRATOS_PARTITION_ID"] = cartridge_agent_config.partition_id + env_params["STRATOS_PERSISTENCE_MAPPINGS"] = \ + cartridge_agent_config.persistence_mappings + env_params["STRATOS_REPO_URL"] = cartridge_agent_config.repo_url + + lb_cluster_id_in_payload = cartridge_agent_config.lb_cluster_id + member_ips = get_lb_member_ip(lb_cluster_id_in_payload) + if member_ips is not None: + env_params["STRATOS_LB_IP"] = member_ips[0] + env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1] + else: + env_params["STRATOS_LB_IP"] = cartridge_agent_config.lb_private_ip + env_params["STRATOS_LB_PUBLIC_IP"] = cartridge_agent_config.lb_public_ip + + topology = TopologyContext.get_topology() + if topology.initialized: + service = topology.get_service(cartridge_agent_config.service_name) + cluster = service.get_cluster(cartridge_agent_config.cluster_id) + member_id_in_payload = cartridge_agent_config.member_id + add_properties(service.properties, env_params, "SERVICE_PROPERTY") + add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY") + add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY") + + return env_params + + +def add_properties(properties, params, prefix): + """ + Adds the given property list to the parameters list with given prefix in the parameter name + :param dict[str, str] properties: service properties + :param dict[str, str] params: + :param str prefix: + :return: dict[str, str] + """ + if properties is None or properties.items() is None: + return + + for key in properties: + params["STRATOS_" + prefix + "_" + key] = str(properties[key]) + log.debug("Property added: [key] STRATOS_ " + prefix + "_" + key + "[value] " + properties[key]) + + +def get_lb_member_ip(lb_cluster_id): + topology = TopologyContext.get_topology() + services = topology.get_services() + + for service in services: + clusters = service.get_clusters() + for cluster in clusters: + members = cluster.get_members() + for member in members: + if member.cluster_id == lb_cluster_id: + return [member.member_ip, member.member_public_ip] + + return None + + +def execute_command(command, env_params=None): + """ + Executes the given command string with given environment parameters + :param str command: Command with arguments to be executed + :param dict[str, str] env_params: Environment variables to be used + :return: output and error string tuple, RuntimeError if errors occur + :rtype: tuple + :exception: RuntimeError + """ + os_env = os.environ.copy() + if env_params is not None: + os_env.update(env_params) + + p = subprocess.Popen([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os_env) + output, errors = p.communicate() + log.debug("output = %r" % output) + log.debug("error = %r" % errors) + if len(errors) > 0: + raise RuntimeError("Command execution failed: \n %r" % errors) + + return output, errors + + +from .. topology.topologycontext import * \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridgeagent/modules/util/log.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridgeagent/modules/util/log.py b/tools/python-cartridge-agent/cartridgeagent/modules/util/log.py new file mode 100644 index 0000000..9bad214 --- /dev/null +++ b/tools/python-cartridge-agent/cartridgeagent/modules/util/log.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import logging.config +import os + + +class LogFactory(object): + """ + Singleton implementation for handling logging in CartridgeAgent + """ + class __LogFactory: + def __init__(self): + self.logs = {} + logging_conf = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "logging.ini" + logging.config.fileConfig(logging_conf) + + def get_log(self, name): + if name not in self.logs: + self.logs[name] = logging.getLogger(name) + + return self.logs[name] + + instance = None + + def __new__(cls, *args, **kwargs): + if not LogFactory.instance: + LogFactory.instance = LogFactory.__LogFactory() + + return LogFactory.instance + + def get_log(self, name): + """ + Returns a logger class with the specified channel name. Creates a new logger if one doesn't exists for the + specified channel + :param str name: Channel name + :return: The logger class + :rtype: RootLogger + """ + return self.instance.get_log(name) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/test/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/test/__init__.py b/tools/python-cartridge-agent/test/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/test/test_util.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/test/test_util.py b/tools/python-cartridge-agent/test/test_util.py new file mode 100644 index 0000000..2439cf2 --- /dev/null +++ b/tools/python-cartridge-agent/test/test_util.py @@ -0,0 +1,4 @@ +from cartridgeagent.modules.util.asyncscheduledtask import AsyncScheduledTask + +def test_async_task(): + assert True \ No newline at end of file
