http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/extensions/defaultextensionhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/extensions/defaultextensionhandler.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/extensions/defaultextensionhandler.py deleted file mode 100644 index 10e783a..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/extensions/defaultextensionhandler.py +++ /dev/null @@ -1,793 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import time -import json - -from abstractextensionhandler import AbstractExtensionHandler -from ..util import extensionutils, cartridgeagentutils - - -class DefaultExtensionHandler(AbstractExtensionHandler): - """ - Default implementation of the AbstractExtensionHandler - """ - log = None - - def __init__(self): - self.log = LogFactory().get_log(__name__) - self.wk_members = [] - self.cartridge_agent_config = CartridgeAgentConfiguration() - - def on_instance_started_event(self): - try: - self.log.debug("Processing instance started event...") - if self.cartridge_agent_config.is_multitenant: - artifact_source = "%r/repository/deployment/server/" % self.cartridge_agent_config.app_path - artifact_dest = cartridgeagentconstants.SUPERTENANT_TEMP_PATH - extensionutils.execute_copy_artifact_extension(artifact_source, artifact_dest) - - env_params = {} - extensionutils.execute_instance_started_extension(env_params) - except: - self.log.exception("Error processing instance started event") - - def on_instance_activated_event(self): - extensionutils.execute_instance_activated_extension() - - def on_artifact_updated_event(self, artifacts_updated_event): - self.log.info("Artifact update event received: [tenant] %r [cluster] %r [status] %r" % - (artifacts_updated_event.tenant_id, artifacts_updated_event.cluster_id, - artifacts_updated_event.status)) - - cluster_id_event = str(artifacts_updated_event.cluster_id).strip() - cluster_id_payload = self.cartridge_agent_config.cluster_id - repo_url = str(artifacts_updated_event.repo_url).strip() - - if (repo_url != "") and (cluster_id_payload is not None) and (cluster_id_payload == cluster_id_event): - local_repo_path = self.cartridge_agent_config.app_path - - secret = self.cartridge_agent_config.cartridge_key - #repoPassword = https://mb_ip:9443/stratosmetadataservice/app_id/alias/repoPassword - repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) - - repo_username = artifacts_updated_event.repo_username - tenant_id = artifacts_updated_event.tenant_id - is_multitenant = self.cartridge_agent_config.is_multitenant - commit_enabled = artifacts_updated_event.commit_enabled - - self.log.info("Executing git checkout") - - # create repo object - repo_info = RepositoryInformation(repo_url, repo_username, repo_password, local_repo_path, tenant_id, - is_multitenant, commit_enabled) - - # checkout code - subscribe_run, repo_context = agentgithandler.AgentGitHandler.checkout(repo_info) - # repo_context = checkout_result["repo_context"] - # execute artifact updated extension - env_params = {"STRATOS_ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id, - "STRATOS_ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id, - "STRATOS_ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url, - "STRATOS_ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password, - "STRATOS_ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username, - "STRATOS_ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status} - - extensionutils.execute_artifacts_updated_extension(env_params) - - if subscribe_run: - # publish instanceActivated - cartridgeagentpublisher.publish_instance_activated_event() - - update_artifacts = self.cartridge_agent_config.read_property(cartridgeagentconstants.ENABLE_ARTIFACT_UPDATE, False) - update_artifacts = True if str(update_artifacts).strip().lower() == "true" else False - if update_artifacts: - auto_commit = self.cartridge_agent_config.is_commits_enabled - auto_checkout = self.cartridge_agent_config.is_checkout_enabled - - try: - update_interval = int( - self.cartridge_agent_config.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL)) - except ParameterNotFoundException: - self.log.exception("Invalid artifact sync interval specified ") - update_interval = 10 - except ValueError: - self.log.exception("Invalid artifact sync interval specified ") - update_interval = 10 - - self.log.info("Artifact updating task enabled, update interval: %r seconds" % update_interval) - - self.log.info("Auto Commit is turned %r " % ("on" if auto_commit else "off")) - self.log.info("Auto Checkout is turned %r " % ("on" if auto_checkout else "off")) - - agentgithandler.AgentGitHandler.schedule_artifact_update_scheduled_task( - repo_info, - auto_checkout, - auto_commit, - update_interval) - - def on_artifact_update_scheduler_event(self, tenant_id): - env_params = {"STRATOS_ARTIFACT_UPDATED_TENANT_ID": str(tenant_id), "STRATOS_ARTIFACT_UPDATED_SCHEDULER": str(True)} - - extensionutils.execute_artifacts_updated_extension(env_params) - - def on_instance_cleanup_cluster_event(self, instance_cleanup_cluster_event): - self.cleanup() - - def on_instance_cleanup_member_event(self, instance_cleanup_member_event): - self.cleanup() - - def on_member_activated_event(self, member_activated_event): - self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r" - % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id)) - - topology_consistent = extensionutils.check_topology_consistency( - member_activated_event.service_name, - member_activated_event.cluster_id, - member_activated_event.member_id) - - if not topology_consistent: - self.log.error("Topology is inconsistent...failed to execute member activated event") - return - - topology = TopologyContext.get_topology() - service = topology.get_service(member_activated_event.service_name) - cluster = service.get_cluster(member_activated_event.cluster_id) - member = cluster.get_member(member_activated_event.member_id) - lb_cluster_id = member.lb_cluster_id - - if extensionutils.is_relevant_member_event(member_activated_event.service_name, - member_activated_event.cluster_id, lb_cluster_id): - - env_params = {"STRATOS_MEMBER_ACTIVATED_MEMBER_IP": str(member_activated_event.member_ip), - "STRATOS_MEMBER_ACTIVATED_MEMBER_ID": str(member_activated_event.member_id), - "STRATOS_MEMBER_ACTIVATED_CLUSTER_ID": str(member_activated_event.cluster_id), - "STRATOS_MEMBER_ACTIVATED_LB_CLUSTER_ID": str(lb_cluster_id), - "STRATOS_MEMBER_ACTIVATED_NETWORK_PARTITION_ID": str(member_activated_event.network_partition_id), - "STRATOS_MEMBER_ACTIVATED_SERVICE_NAME": str(member_activated_event.service_name)} - - ports = member_activated_event.port_map.values() - ports_str = "" - for port in ports: - ports_str += port.protocol + "," + str(port.value) + "," + str(port.proxy) + "|" - - env_params["STRATOS_MEMBER_ACTIVATED_PORTS"] = ports_str - - env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = json.dumps(cluster.member_list_json) - - member_ips = extensionutils.get_lb_member_ip(lb_cluster_id) - if member_ips is not None and len(member_ips) > 1: - env_params["STRATOS_MEMBER_ACTIVATED_LB_IP"] = str(member_ips[0]) - env_params["STRATOS_MEMBER_ACTIVATED_LB_PUBLIC_IP"] = str(member_ips[1]) - - env_params["STRATOS_TOPOLOGY_JSON"] = json.dumps(topology.json_str) - - extensionutils.add_properties(service.properties, env_params, "MEMBER_ACTIVATED_SERVICE_PROPERTY") - extensionutils.add_properties(cluster.properties, env_params, "MEMBER_ACTIVATED_CLUSTER_PROPERTY") - extensionutils.add_properties(member.properties, env_params, "MEMBER_ACTIVATED_MEMBER_PROPERTY") - - clustered = self.cartridge_agent_config.is_clustered - - if member.properties is not None and cartridgeagentconstants.CLUSTERING_PRIMARY_KEY in member.properties \ - and member.properties[cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" \ - and clustered is not None and clustered: - - self.log.debug(" If WK member is re-spawned, update axis2.xml ") - - has_wk_ip_changed = True - for wk_member in self.wk_members: - if wk_member.member_ip == member_activated_event.member_ip: - has_wk_ip_changed = False - - self.log.debug(" hasWKIpChanged %r" + has_wk_ip_changed) - - min_count = int(self.cartridge_agent_config.min_count) - is_wk_member_grp_ready = self.is_wk_member_group_ready(env_params, min_count) - self.log.debug("MinCount: %r" % min_count) - self.log.debug("is_wk_member_grp_ready : %r" % is_wk_member_grp_ready) - - if has_wk_ip_changed and is_wk_member_grp_ready: - self.log.debug("Setting env var STRATOS_UPDATE_WK_IP to true") - env_params["STRATOS_UPDATE_WK_IP"] = "true" - - self.log.debug("Setting env var STRATOS_CLUSTERING to %r" % clustered) - env_params["STRATOS_CLUSTERING"] = str(clustered) - env_params["STRATOS_WK_MEMBER_COUNT"] = str(self.cartridge_agent_config.min_count) - - extensionutils.execute_member_activated_extension(env_params) - else: - self.log.debug("Member activated event is not relevant...skipping agent extension") - - def on_complete_topology_event(self, complete_topology_event): - self.log.debug("Complete topology event received") - - service_name_in_payload = self.cartridge_agent_config.service_name - cluster_id_in_payload = self.cartridge_agent_config.cluster_id - member_id_in_payload = self.cartridge_agent_config.member_id - - consistant = extensionutils.check_topology_consistency( - service_name_in_payload, - cluster_id_in_payload, - member_id_in_payload) - - if not consistant: - return - else: - self.cartridge_agent_config.initialized = True - - topology = complete_topology_event.get_topology() - service = topology.get_service(service_name_in_payload) - cluster = service.get_cluster(cluster_id_in_payload) - - env_params = {"STRATOS_TOPOLOGY_JSON": json.dumps(topology.json_str), "STRATOS_MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)} - - extensionutils.execute_complete_topology_extension(env_params) - - def on_instance_spawned_event(self, instance_spawned_event): - self.log.debug("Instance Spawned event received") - - service_name_in_payload = self.cartridge_agent_config.service_name - cluster_id_in_payload = self.cartridge_agent_config.cluster_id - member_id_in_payload = self.cartridge_agent_config.member_id - - consistant = extensionutils.check_topology_consistency( - service_name_in_payload, - cluster_id_in_payload, - member_id_in_payload) - - if not consistant: - return - else: - self.cartridge_agent_config.initialized = True - - def on_complete_tenant_event(self, complete_tenant_event): - self.log.debug("Complete tenant event received") - - tenant_list_json = complete_tenant_event.tenant_list_json - self.log.debug("Complete tenants:" + json.dumps(tenant_list_json)) - - env_params = {"STRATOS_TENANT_LIST_JSON": json.dumps(tenant_list_json)} - - extensionutils.execute_complete_tenant_extension(env_params) - - def on_member_terminated_event(self, member_terminated_event): - self.log.info("Member terminated event received: [service] " + member_terminated_event.service_name + - " [cluster] " + member_terminated_event.cluster_id - + " [member] " + member_terminated_event.member_id) - - topology_consistent = extensionutils.check_topology_consistency( - member_terminated_event.service_name, - member_terminated_event.cluster_id, - member_terminated_event.member_id - ) - - if not topology_consistent: - self.log.error("Topology is inconsistent...failed to execute member terminated event") - return - - topology = TopologyContext.get_topology() - service = topology.get_service(member_terminated_event.service_name) - cluster = service.get_cluster(member_terminated_event.cluster_id) - terminated_member = cluster.get_member(member_terminated_event.member_id) - lb_cluster_id = cluster.get_member(member_terminated_event.member_id).lb_cluster_id - - #check whether terminated member is within the same cluster, LB cluster or service group - if extensionutils.is_relevant_member_event( - member_terminated_event.service_name, - member_terminated_event.cluster_id, - lb_cluster_id): - - env_params = {"STRATOS_MEMBER_TERMINATED_MEMBER_IP": terminated_member.member_ip, - "STRATOS_MEMBER_TERMINATED_MEMBER_ID": member_terminated_event.member_id, - "STRATOS_MEMBER_TERMINATED_CLUSTER_ID": member_terminated_event.cluster_id, - "STRATOS_MEMBER_TERMINATED_LB_CLUSTER_ID": lb_cluster_id, - "STRATOS_MEMBER_TERMINATED_NETWORK_PARTITION_ID": member_terminated_event.network_partition_id, - "STRATOS_MEMBER_TERMINATED_SERVICE_NAME": member_terminated_event.service_name, - "STRATOS_MEMBER_TERMINATED_MEMBER_LIST_JSON": json.dumps(cluster.member_list_json), - "STRATOS_TOPOLOGY_JSON": json.dumps(topology.json_str)} - - member_ips = extensionutils.get_lb_member_ip(lb_cluster_id) - if member_ips is not None and len(member_ips) > 1: - env_params["STRATOS_MEMBER_TERMINATED_LB_IP"] = member_ips[0] - env_params["STRATOS_MEMBER_TERMINATED_LB_PUBLIC_IP"] = member_ips[1] - - extensionutils.add_properties(service.properties, env_params, "MEMBER_TERMINATED_SERVICE_PROPERTY") - extensionutils.add_properties(cluster.properties, env_params, "MEMBER_TERMINATED_CLUSTER_PROPERTY") - extensionutils.add_properties(terminated_member.properties, env_params, "MEMBER_TERMINATED_MEMBER_PROPERTY") - - extensionutils.execute_member_terminated_extension(env_params) - - else: - self.log.debug("Member terminated event is not relevant...skipping agent extension") - - def on_member_suspended_event(self, member_suspended_event): - self.log.info("Member suspended event received: [service] " + member_suspended_event.service_name + - " [cluster] " + member_suspended_event.cluster_id + " [member] " + member_suspended_event.member_id) - - topology_consistent = extensionutils.check_topology_consistency( - member_suspended_event.service_name, - member_suspended_event.cluster_id, - member_suspended_event.member_id - ) - - if not topology_consistent: - self.log.error("Topology is inconsistent...failed to execute member suspended event") - return - - topology = TopologyContext.get_topology() - service = topology.get_service(member_suspended_event.service_name) - cluster = service.get_cluster(member_suspended_event.cluster_id) - suspended_member = cluster.get_member(member_suspended_event.member_id) - lb_cluster_id = cluster.get_member(member_suspended_event.member_id).lb_cluster_id - - #check whether suspended member is within the same cluster, LB cluster or service group - if extensionutils.is_relevant_member_event( - member_suspended_event.service_name, - member_suspended_event.cluster_id, - lb_cluster_id): - - env_params = {"STRATOS_MEMBER_SUSPENDED_MEMBER_IP": member_suspended_event.member_ip, - "STRATOS_MEMBER_SUSPENDED_MEMBER_ID": member_suspended_event.member_id, - "STRATOS_MEMBER_SUSPENDED_CLUSTER_ID": member_suspended_event.cluster_id, - "STRATOS_MEMBER_SUSPENDED_LB_CLUSTER_ID": lb_cluster_id, - "STRATOS_MEMBER_SUSPENDED_NETWORK_PARTITION_ID": member_suspended_event.network_partition_id, - "STRATOS_MEMBER_SUSPENDED_SERVICE_NAME": member_suspended_event.service_name, - "STRATOS_MEMBER_SUSPENDED_MEMBER_LIST_JSON": json.dumps(cluster.member_list_json), - "STRATOS_TOPOLOGY_JSON": json.dumps(topology.json_str)} - - member_ips = extensionutils.get_lb_member_ip(lb_cluster_id) - if member_ips is not None and len(member_ips) > 1: - env_params["STRATOS_MEMBER_SUSPENDED_LB_IP"] = member_ips[0] - env_params["STRATOS_MEMBER_SUSPENDED_LB_PUBLIC_IP"] = member_ips[1] - - extensionutils.add_properties(service.properties, env_params, "MEMBER_SUSPENDED_SERVICE_PROPERTY") - extensionutils.add_properties(cluster.properties, env_params, "MEMBER_SUSPENDED_CLUSTER_PROPERTY") - extensionutils.add_properties(suspended_member.properties, env_params, "MEMBER_SUSPENDED_MEMBER_PROPERTY") - - extensionutils.execute_member_suspended_extension(env_params) - - else: - self.log.debug("Member suspended event is not relevant...skipping agent extension") - - def on_member_started_event(self, member_started_event): - self.log.info("Member started event received: [service] " + member_started_event.service_name + - " [cluster] " + member_started_event.cluster_id + " [member] " + member_started_event.member_id) - - topology_consistent = extensionutils.check_topology_consistency( - member_started_event.service_name, - member_started_event.cluster_id, - member_started_event.member_id - ) - - if not topology_consistent: - self.log.error("Topology is inconsistent...failed to execute member started event") - return - - topology = TopologyContext.get_topology() - service = topology.get_service(member_started_event.service_name) - cluster = service.get_cluster(member_started_event.cluster_id) - started_member = cluster.get_member(member_started_event.member_id) - lb_cluster_id = cluster.get_member(member_started_event.member_id).lb_cluster_id - - #check whether started member is within the same cluster, LB cluster or service group - if extensionutils.is_relevant_member_event( - member_started_event.service_name, - member_started_event.cluster_id, - lb_cluster_id): - - env_params = {"STRATOS_MEMBER_STARTED_MEMBER_IP": started_member.member_ip, - "STRATOS_MEMBER_STARTED_MEMBER_ID": member_started_event.member_id, - "STRATOS_MEMBER_STARTED_CLUSTER_ID": member_started_event.cluster_id, - "STRATOS_MEMBER_STARTED_LB_CLUSTER_ID": lb_cluster_id, - "STRATOS_MEMBER_STARTED_NETWORK_PARTITION_ID": member_started_event.network_partition_id, - "STRATOS_MEMBER_STARTED_SERVICE_NAME": member_started_event.service_name, - "STRATOS_MEMBER_STARTED_MEMBER_LIST_JSON": json.dumps(cluster.member_list_json), - "STRATOS_TOPOLOGY_JSON": json.dumps(topology.json_str)} - - member_ips = extensionutils.get_lb_member_ip(lb_cluster_id) - if member_ips is not None and len(member_ips) > 1: - env_params["STRATOS_MEMBER_STARTED_LB_IP"] = member_ips[0] - env_params["STRATOS_MEMBER_STARTED_LB_PUBLIC_IP"] = member_ips[1] - - extensionutils.add_properties(service.properties, env_params, "MEMBER_STARTED_SERVICE_PROPERTY") - extensionutils.add_properties(cluster.properties, env_params, "MEMBER_STARTED_CLUSTER_PROPERTY") - extensionutils.add_properties(started_member.properties, env_params, "MEMBER_STARTED_MEMBER_PROPERTY") - - extensionutils.execute_member_started_extension(env_params) - - else: - self.log.debug("Member started event is not relevant...skipping agent extension") - - def start_server_extension(self): - #wait until complete topology message is received to get LB IP - extensionutils.wait_for_complete_topology() - self.log.info("[start server extension] complete topology event received") - - service_name_in_payload = self.cartridge_agent_config.service_name - cluster_id_in_payload = self.cartridge_agent_config.cluster_id - member_id_in_payload = self.cartridge_agent_config.member_id - - topology_consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) - - try: - if not topology_consistant: - self.log.error("Topology is inconsistent...failed to execute start server event") - return - - topology = TopologyContext.get_topology() - service = topology.get_service(service_name_in_payload) - cluster = service.get_cluster(cluster_id_in_payload) - - # store environment variable parameters to be passed to extension shell script - env_params = {} - - # if clustering is enabled wait until all well known members have started - clustering_enabled = self.cartridge_agent_config.is_clustered - if clustering_enabled: - env_params["STRATOS_CLUSTERING"] = "true" - env_params["STRATOS_WK_MEMBER_COUNT"] = self.cartridge_agent_config.min_count - - env_params["STRATOS_PRIMARY"] = "true" if self.cartridge_agent_config.is_primary else "false" - - self.wait_for_wk_members(env_params) - self.log.info("All well known members have started! Resuming start server extension...") - - env_params["STRATOS_TOPOLOGY_JSON"] = json.dumps(topology.json_str) - env_params["STRATOS_MEMBER_LIST_JSON"] = json.dumps(cluster.member_list_json) - - extensionutils.execute_start_servers_extension(env_params) - - except: - self.log.exception("Error processing start servers event") - - def volume_mount_extension(self, persistence_mappings_payload): - extensionutils.execute_volume_mount_extension(persistence_mappings_payload) - - def on_subscription_domain_added_event(self, subscription_domain_added_event): - tenant_domain = self.find_tenant_domain(subscription_domain_added_event.tenant_id) - self.log.info( - "Subscription domain added event received: [tenant-id] " + subscription_domain_added_event.tenant_id + - " [tenant-domain] " + tenant_domain + " [domain-name] " + subscription_domain_added_event.domain_name + - " [application-context] " + subscription_domain_added_event.application_context - ) - - env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscription_domain_added_event.service_name, - "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscription_domain_added_event.domain_name, - "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscription_domain_added_event.tenant_id), - "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain, - "STRATOS_SUBSCRIPTION_APPLICATION_CONTEXT": subscription_domain_added_event.application_context} - - extensionutils.execute_subscription_domain_added_extension(env_params) - - def on_subscription_domain_removed_event(self, subscription_domain_removed_event): - tenant_domain = self.find_tenant_domain(subscription_domain_removed_event.tenant_id) - self.log.info( - "Subscription domain removed event received: [tenant-id] " + subscription_domain_removed_event.tenant_id + - " [tenant-domain] " + tenant_domain + " [domain-name] " + subscription_domain_removed_event.domain_name - ) - - env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscription_domain_removed_event.service_name, - "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscription_domain_removed_event.domain_name, - "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscription_domain_removed_event.tenant_id), - "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain} - - extensionutils.execute_subscription_domain_removed_extension(env_params) - - def on_copy_artifacts_extension(self, src, des): - extensionutils.execute_copy_artifact_extension(src, des) - - def on_tenant_subscribed_event(self, tenant_subscribed_event): - self.log.info( - "Tenant subscribed event received: [tenant] " + tenant_subscribed_event.tenant_id + - " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids - ) - - extensionutils.execute_tenant_subscribed_extension({}) - - def on_tenant_unsubscribed_event(self, tenant_unsubscribed_event): - self.log.info( - "Tenant unsubscribed event received: [tenant] " + tenant_unsubscribed_event.tenant_id + - " [service] " + tenant_unsubscribed_event.service_name + - " [cluster] " + tenant_unsubscribed_event.cluster_ids - ) - - try: - if self.cartridge_agent_config.service_name == tenant_unsubscribed_event.service_name: - agentgithandler.AgentGitHandler.remove_repo(tenant_unsubscribed_event.tenant_id) - except: - self.log.exception("Removing git repository failed: ") - extensionutils.execute_tenant_unsubscribed_extension({}) - - def cleanup(self): - self.log.info("Executing cleaning up the data in the cartridge instance...") - - cartridgeagentpublisher.publish_maintenance_mode_event() - - extensionutils.execute_cleanup_extension() - self.log.info("cleaning up finished in the cartridge instance...") - - self.log.info("publishing ready to shutdown event...") - cartridgeagentpublisher.publish_instance_ready_to_shutdown_event() - - def is_wk_member_group_ready(self, env_params, min_count): - topology = TopologyContext.get_topology() - if topology is None or not topology.initialized: - return False - - service_group_in_payload = self.cartridge_agent_config.service_group - if service_group_in_payload is not None: - env_params["STRATOS_SERVICE_GROUP"] = service_group_in_payload - - # clustering logic for apimanager - if service_group_in_payload is not None and service_group_in_payload == "apim": - # handle apistore and publisher case - if self.cartridge_agent_config.service_name == cartridgeagentconstants.APIMANAGER_SERVICE_NAME or \ - self.cartridge_agent_config.service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME: - - apistore_cluster_collection = topology.get_service(cartridgeagentconstants.APIMANAGER_SERVICE_NAME)\ - .get_clusters() - publisher_cluster_collection = topology.get_service(cartridgeagentconstants.PUBLISHER_SERVICE_NAME)\ - .get_clusters() - - apistore_member_list = [] - for member in apistore_cluster_collection[0].get_members(): - if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated: - apistore_member_list.append(member) - self.wk_members.append(member) - - if len(apistore_member_list) == 0: - self.log.debug("API Store members not yet created") - return False - - apistore_member = apistore_member_list[0] - env_params["STRATOS_WK_APISTORE_MEMBER_IP"] = apistore_member.member_ip - self.log.debug("STRATOS_WK_APISTORE_MEMBER_IP: %r" % apistore_member.member_ip) - - publisher_member_list = [] - for member in publisher_cluster_collection[0].get_members(): - if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated: - publisher_member_list.append(member) - self.wk_members.append(member) - - if len(publisher_member_list) == 0: - self.log.debug("API Publisher members not yet created") - - publisher_member = publisher_member_list[0] - env_params["STRATOS_WK_PUBLISHER_MEMBER_IP"] = publisher_member.member_ip - self.log.debug("STRATOS_WK_PUBLISHER_MEMBER_IP: %r" % publisher_member.member_ip) - - return True - - elif self.cartridge_agent_config.service_name == cartridgeagentconstants.GATEWAY_MGT_SERVICE_NAME or \ - self.cartridge_agent_config.service_name == cartridgeagentconstants.GATEWAY_SERVICE_NAME: - - if self.cartridge_agent_config.deployment is not None: - # check if deployment is Manager Worker separated - if self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ - self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - - self.log.info("Deployment pattern for the node: %r" % self.cartridge_agent_config.deployment) - env_params["DEPLOYMENT"] = self.cartridge_agent_config.deployment - # check if WKA members of Manager Worker separated deployment is ready - return self.is_manager_worker_WKA_group_ready(env_params) - - elif self.cartridge_agent_config.service_name == cartridgeagentconstants.KEY_MANAGER_SERVICE_NAME: - return True - - else: - if self.cartridge_agent_config.deployment is not None: - # check if deployment is Manager Worker separated - if self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ - self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - - self.log.info("Deployment pattern for the node: %r" % self.cartridge_agent_config.deployment) - env_params["DEPLOYMENT"] = self.cartridge_agent_config.deployment - # check if WKA members of Manager Worker separated deployment is ready - return self.is_manager_worker_WKA_group_ready(env_params) - - service_name_in_payload = self.cartridge_agent_config.service_name - cluster_id_in_payload = self.cartridge_agent_config.cluster_id - service = topology.get_service(service_name_in_payload) - cluster = service.get_cluster(cluster_id_in_payload) - - wk_members = [] - for member in cluster.get_members(): - if member.properties is not None and \ - cartridgeagentconstants.PRIMARY in member.properties \ - and member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ - (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): - - wk_members.append(member) - self.wk_members.append(member) - self.log.debug("Found WKA: STRATOS_WK_MEMBER_IP: " + member.member_ip) - - if len(wk_members) >= min_count: - idx = 0 - for member in wk_members: - env_params["STRATOS_WK_MEMBER_" + idx + "_IP"] = member.member_ip - self.log.debug("STRATOS_WK_MEMBER_" + idx + "_IP:" + member.member_ip) - - idx += 1 - - return True - - return False - - # generic worker manager separated clustering logic - def is_manager_worker_WKA_group_ready(self, env_params): - - # for this, we need both manager cluster service name and worker cluster service name - manager_service_name = self.cartridge_agent_config.manager_service_name - worker_service_name = self.cartridge_agent_config.worker_service_name - - # managerServiceName and workerServiceName both should not be null /empty - if manager_service_name is None or manager_service_name.strip() == "": - self.log.error("Manager service name [ " + manager_service_name + " ] is invalid") - return False - - if worker_service_name is None or worker_service_name.strip() == "": - self.log.error("Worker service name [ " + worker_service_name + " ] is invalid") - return False - - min_manager_instances_available = False - min_worker_instances_available = False - - topology = TopologyContext.get_topology() - manager_service = topology.get_service(manager_service_name) - worker_service = topology.get_service(worker_service_name) - - if manager_service is None: - self.log.warn("Service [ " + manager_service_name + " ] is not found") - return False - - if worker_service is None: - self.log.warn("Service [ " + worker_service_name + " ] is not found") - return False - - # manager clusters - manager_clusters = manager_service.get_clusters() - if manager_clusters is None or len(manager_clusters) == 0: - self.log.warn("No clusters found for service [ " + manager_service_name + " ]") - return False - - manager_min_instance_count = 1 - manager_min_instance_count_found = False - - manager_wka_members = [] - for member in manager_clusters[0].get_members(): - if member.properties is not None and \ - cartridgeagentconstants.PRIMARY in member.properties \ - and member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ - (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): - - manager_wka_members.append(member) - self.wk_members.append(member) - - # get the min instance count - if not manager_min_instance_count_found: - manager_min_instance_count = self.get_min_instance_count_from_member(member) - manager_min_instance_count_found = True - self.log.info("Manager min instance count: " + manager_min_instance_count) - - if len(manager_wka_members) >= manager_min_instance_count: - min_manager_instances_available = True - idx = 0 - for member in manager_wka_members: - env_params["STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP"] = member.member_ip - self.log.debug("STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP: " + member.member_ip) - idx += 1 - - env_params["STRATOS_WK_MANAGER_MEMBER_COUNT"] = int(manager_min_instance_count) - - # If all the manager members are non primary and is greate than or equal to mincount, - # minManagerInstancesAvailable should be true - all_managers_non_primary = True - for member in manager_clusters[0].get_members(): - # get the min instance count - if not manager_min_instance_count_found: - manager_min_instance_count = self.get_min_instance_count_from_member(member) - manager_min_instance_count_found = True - self.log.info( - "Manager min instance count when allManagersNonPrimary true : " + manager_min_instance_count) - - if member.properties is not None and cartridgeagentconstants.PRIMARY in member.properties and \ - member.properties[cartridgeagentconstants.PRIMARY].lower() == "true": - all_managers_non_primary = False - break - - self.log.debug( - " allManagerNonPrimary & managerMinInstanceCount [" + all_managers_non_primary + - "], [" + manager_min_instance_count + "] ") - - if all_managers_non_primary and len(manager_clusters) >= manager_min_instance_count: - min_manager_instances_available = True - - # worker cluster - worker_clusters = worker_service.get_clusters() - if worker_clusters is None or len(worker_clusters) == 0: - self.log.warn("No clusters found for service [ " + worker_service_name + " ]") - return False - - worker_min_instance_count = 1 - worker_min_instance_count_found = False - - worker_wka_members = [] - for member in worker_clusters[0].get_members(): - self.log.debug("Checking member : " + member.member_id) - - if member.properties is not None and cartridgeagentconstants.PRIMARY in member.properties and \ - member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ - (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): - - self.log.debug("Added worker member " + member.member_id) - - worker_wka_members.append(member) - self.wk_members.append(member) - - # get the min instance count - if not worker_min_instance_count_found: - worker_min_instance_count = self.get_min_instance_count_from_member(member) - worker_min_instance_count_found = True - self.log.debug("Worker min instance count: " + worker_min_instance_count) - - if len(worker_wka_members) >= worker_min_instance_count: - min_worker_instances_available = True - idx = 0 - for member in worker_wka_members: - env_params["STRATOS_WK_WORKER_MEMBER_" + idx + "_IP"] = member.member_ip - self.log.debug("STRATOS_WK_WORKER_MEMBER_" + idx + "_IP: " + member.member_ip) - idx += 1 - - env_params["STRATOS_WK_WORKER_MEMBER_COUNT"] = int(worker_min_instance_count) - - self.log.debug( - " Returnning values minManagerInstancesAvailable && minWorkerInstancesAvailable [" + - min_manager_instances_available + "], [" + min_worker_instances_available + "] ") - - return min_manager_instances_available and min_worker_instances_available - - def get_min_instance_count_from_member(self, member): - if cartridgeagentconstants.MIN_COUNT in member.properties: - return int(member.properties[cartridgeagentconstants.MIN_COUNT]) - - return 1 - - def find_tenant_domain(self, tenant_id): - tenant = TenantContext.get_tenant(tenant_id) - if tenant is None: - raise RuntimeError("Tenant could not be found: [tenant-id] %r" % tenant_id) - - return tenant.tenant_domain - - def wait_for_wk_members(self, env_params): - min_count = int(self.cartridge_agent_config.min_count) - is_wk_member_group_ready = False - while not is_wk_member_group_ready: - self.log.info("Waiting for %r well known members..." % min_count) - - time.sleep(5) - - is_wk_member_group_ready = self.is_wk_member_group_ready(env_params, min_count) - -from ..artifactmgt.git import agentgithandler -from ..artifactmgt.repositoryinformation import RepositoryInformation -from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ..publisher import cartridgeagentpublisher -from ..exception.parameternotfoundexception import ParameterNotFoundException -from ..topology.topologycontext import * -from ..tenant.tenantcontext import * -from ..util.log import LogFactory \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/__init__.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py deleted file mode 100644 index 685344d..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py +++ /dev/null @@ -1,62 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -class AbstractHealthStatisticsReader: - """ - Abstract class to implement to create a custom health stat reader - """ - - def stat_cartridge_health(self): - """ - Abstract method that when implemented reads the memory usage and the load average - of the instance running the agent and returns a CartridgeHealthStatistics object - with the information - - :return: CartridgeHealthStatistics object with memory usage and load average values - :rtype : CartridgeHealthStatistics - """ - raise NotImplementedError - - -class CartridgeHealthStatistics: - """ - Holds the memory usage and load average reading - """ - - def __init__(self): - self.memory_usage = None - """:type : float""" - self.load_avg = None - """:type : float""" - - -class CEPPublisherException(Exception): - """ - Exception to be used during CEP publishing operations - """ - - def __init__(self, msg): - super(self, msg) - self.message = msg - - def get_message(self): - """ - The message provided when the exception is raised - :return: message - :rtype: str - """ - return self.message http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py deleted file mode 100644 index 6bb574b..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py +++ /dev/null @@ -1,249 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from threading import Thread -import time -import psutil -import os -import multiprocessing - -from abstracthealthstatisticspublisher import * -from ..databridge.agent import * -from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ..util import cartridgeagentutils, cartridgeagentconstants - - -class HealthStatisticsPublisherManager(Thread): - """ - Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and - load average and publishes them as ThriftEvents to a CEP server - """ - STREAM_NAME = "cartridge_agent_health_stats" - STREAM_VERSION = "1.0.0" - STREAM_NICKNAME = "agent health stats" - STREAM_DESCRIPTION = "agent health stats" - - def __init__(self, publish_interval): - """ - Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval - :param int publish_interval: Number of seconds as the interval - :return: void - """ - Thread.__init__(self) - - self.log = LogFactory().get_log(__name__) - - self.publish_interval = publish_interval - """:type : int""" - - self.terminated = False - - self.publisher = HealthStatisticsPublisher() - """:type : HealthStatisticsPublisher""" - # TODO: load plugins for the reader - self.stats_reader = DefaultHealthStatisticsReader() - """:type : AbstractHealthStatisticsReader""" - - def run(self): - while not self.terminated: - time.sleep(self.publish_interval) - - cartridge_stats = self.stats_reader.stat_cartridge_health() - self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage) - self.publisher.publish_memory_usage(cartridge_stats.memory_usage) - - self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) - self.publisher.publish_load_average(cartridge_stats.load_avg) - - self.publisher.publisher.disconnect() - - -class HealthStatisticsPublisher: - """ - Publishes memory usage and load average to thrift server - """ - log = LogFactory().get_log(__name__) - - def __init__(self): - - self.ports = [] - self.ports.append(CEPPublisherConfiguration.get_instance().server_port) - - self.cartridge_agent_config = CartridgeAgentConfiguration() - - cartridgeagentutils.wait_until_ports_active( - CEPPublisherConfiguration.get_instance().server_ip, - self.ports, - int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) - cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) - if not cep_active: - raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") - - self.stream_definition = HealthStatisticsPublisher.create_stream_definition() - HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) - self.publisher = ThriftPublisher( - CEPPublisherConfiguration.get_instance().server_ip, - CEPPublisherConfiguration.get_instance().server_port, - CEPPublisherConfiguration.get_instance().admin_username, - CEPPublisherConfiguration.get_instance().admin_password, - self.stream_definition) - - HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") - - @staticmethod - def create_stream_definition(): - """ - Create a StreamDefinition for publishing to CEP - """ - stream_def = StreamDefinition() - stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME - stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION - stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME - stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION - - stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE) - - return stream_def - - def publish_memory_usage(self, memory_usage): - """ - Publishes the given memory usage value to the thrift server as a ThriftEvent - :param float memory_usage: memory usage - """ - - event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) - event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION) - event.payloadData.append(memory_usage) - - HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) - self.publisher.publish(event) - - def publish_load_average(self, load_avg): - """ - Publishes the given load average value to the thrift server as a ThriftEvent - :param float load_avg: load average value - """ - - event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) - event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE) - event.payloadData.append(load_avg) - - HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) - self.publisher.publish(event) - - -class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader): - """ - Default implementation of the AbstractHealthStatisticsReader - """ - - def __init__(self): - self.log = LogFactory().get_log(__name__) - - def stat_cartridge_health(self): - cartridge_stats = CartridgeHealthStatistics() - cartridge_stats.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage() - cartridge_stats.load_avg = DefaultHealthStatisticsReader.__read_load_avg() - - self.log.debug("Memory read: %r, CPU read: %r" % (cartridge_stats.memory_usage, cartridge_stats.load_avg)) - return cartridge_stats - - @staticmethod - def __read_mem_usage(): - return psutil.virtual_memory().percent - - @staticmethod - def __read_load_avg(): - (one, five, fifteen) = os.getloadavg() - cores = multiprocessing.cpu_count() - - return (one/cores) * 100 - - -class CEPPublisherConfiguration: - """ - TODO: Extract common functionality - """ - - __instance = None - log = LogFactory().get_log(__name__) - - @staticmethod - def get_instance(): - """ - Singleton instance retriever - :return: Instance - :rtype : CEPPublisherConfiguration - """ - if CEPPublisherConfiguration.__instance is None: - CEPPublisherConfiguration.__instance = CEPPublisherConfiguration() - - return CEPPublisherConfiguration.__instance - - def __init__(self): - self.enabled = False - self.server_ip = None - self.server_port = None - self.admin_username = None - self.admin_password = None - self.cartridge_agent_config = CartridgeAgentConfiguration() - - self.read_config() - - def read_config(self): - self.enabled = True if self.cartridge_agent_config.read_property( - cartridgeagentconstants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False - if not self.enabled: - CEPPublisherConfiguration.log.info("CEP Publisher disabled") - return - - CEPPublisherConfiguration.log.info("CEP Publisher enabled") - - self.server_ip = self.cartridge_agent_config.read_property( - cartridgeagentconstants.CEP_RECEIVER_IP, False) - if self.server_ip is None or self.server_ip.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_IP) - - self.server_port = self.cartridge_agent_config.read_property( - cartridgeagentconstants.CEP_RECEIVER_PORT, False) - if self.server_port is None or self.server_port.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_PORT) - - self.admin_username = self.cartridge_agent_config.read_property( - cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME, False) - if self.admin_username is None or self.admin_username.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME) - - self.admin_password = self.cartridge_agent_config.read_property( - cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD, False) - if self.admin_password is None or self.admin_password.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD) - - CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py deleted file mode 100644 index 1ce8ffb..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py +++ /dev/null @@ -1,165 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import logging - -import paho.mqtt.publish as publish - -from .. event.instance.status.events import * -from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from .. util import cartridgeagentconstants -from .. healthstatspublisher.healthstats import * -from .. healthstatspublisher.abstracthealthstatisticspublisher import * - - -log = LogFactory().get_log(__name__) - -started = False -activated = False -ready_to_shutdown = False -maintenance = False - -publishers = {} -""" :type : dict[str, EventPublisher] """ - - -def publish_instance_started_event(): - global started, log - if not started: - log.info("Publishing instance started event") - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - parition_id = CartridgeAgentConfiguration().partition_id - member_id = CartridgeAgentConfiguration().member_id - - instance_started_event = InstanceStartedEvent(service_name, cluster_id, network_partition_id, parition_id, - member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_STARTED_EVENT) - publisher.publish(instance_started_event) - started = True - log.info("Instance started event published") - else: - log.warn("Instance already started") - - -def publish_instance_activated_event(): - global activated, log - if not activated: - log.info("Publishing instance activated event") - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - parition_id = CartridgeAgentConfiguration().partition_id - member_id = CartridgeAgentConfiguration().member_id - - instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, network_partition_id, parition_id, - member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_ACTIVATED_EVENT) - publisher.publish(instance_activated_event) - - log.info("Instance activated event published") - log.info("Starting health statistics notifier") - - if CEPPublisherConfiguration.get_instance().enabled: - interval_default = 15 # seconds - interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval", False) - if interval is not None and len(interval) > 0: - try: - interval = int(interval) - except ValueError: - interval = interval_default - else: - interval = interval_default - - health_stats_publisher = HealthStatisticsPublisherManager(interval) - log.info("Starting Health statistics publisher with interval %r" % interval_default) - health_stats_publisher.start() - else: - log.warn("Statistics publisher is disabled") - - activated = True - log.info("Health statistics notifier started") - else: - log.warn("Instance already activated") - - -def publish_maintenance_mode_event(): - global maintenance, log - if not maintenance: - log.info("Publishing instance maintenance mode event") - - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - parition_id = CartridgeAgentConfiguration().partition_id - member_id = CartridgeAgentConfiguration().member_id - - instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, network_partition_id, parition_id, - member_id) - - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_MAINTENANCE_MODE_EVENT) - publisher.publish(instance_maintenance_mode_event) - - maintenance = True - log.info("Instance Maintenance mode event published") - else: - log.warn("Instance already in a Maintenance mode....") - - -def publish_instance_ready_to_shutdown_event(): - global ready_to_shutdown, log - if not ready_to_shutdown: - log.info("Publishing instance activated event") - - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - parition_id = CartridgeAgentConfiguration().partition_id - member_id = CartridgeAgentConfiguration().member_id - - instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, network_partition_id, parition_id, - member_id) - - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_READY_TO_SHUTDOWN_EVENT) - publisher.publish(instance_shutdown_event) - - ready_to_shutdown = True - log.info("Instance ReadyToShutDown event published") - else: - log.warn("Instance already in a ReadyToShutDown event....") - - -def get_publisher(topic): - if topic not in publishers: - publishers[topic] = EventPublisher(topic) - - return publishers[topic] - - -class EventPublisher: - """ - Handles publishing events to topics to the provided message broker - """ - def __init__(self, topic): - self.__topic = topic - - def publish(self, event): - mb_ip = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP) - mb_port = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT) - payload = event.to_json() - publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py deleted file mode 100644 index 2456923..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py deleted file mode 100644 index bc026dd..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py +++ /dev/null @@ -1,96 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import threading -import paho.mqtt.client as mqtt - - -class EventSubscriber(threading.Thread): - """ - Provides functionality to subscribe to a given topic on the stratos MB and - register event handlers for various events. - """ - - def __init__(self, topic, ip, port): - threading.Thread.__init__(self) - - #{"ArtifactUpdateEvent" : onArtifactUpdateEvent()} - self.__event_handlers = {} - - self.log = LogFactory().get_log(__name__) - - self.__mb_client = None - - self.__topic = topic - - self.__subscribed = False - - self.__ip = ip - self.__port = port - - def run(self): - self.__mb_client = mqtt.Client() - self.__mb_client.on_connect = self.on_connect - self.__mb_client.on_message = self.on_message - - self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) - self.__mb_client.connect(self.__ip, self.__port, 60) - self.__subscribed = True - self.__mb_client.loop_forever() - - def register_handler(self, event, handler): - """ - Adds an event handler function mapped to the provided event. - :param str event: Name of the event to attach the provided handler - :param handler: The handler function - :return: void - :rtype: void - """ - self.__event_handlers[event] = handler - self.log.debug("Registered handler for event %r" % event) - - def on_connect(self, client, userdata, flags, rc): - self.log.debug("Connected to message broker.") - self.__mb_client.subscribe(self.__topic) - self.log.debug("Subscribed to %r" % self.__topic) - - def on_message(self, client, userdata, msg): - self.log.debug("Message received: %r:\n%r" % (msg.topic, msg.payload)) - - event = msg.topic.rpartition('/')[2] - - if event in self.__event_handlers: - handler = self.__event_handlers[event] - - try: - self.log.debug("Executing handler for event %r" % event) - handler(msg) - except: - self.log.exception("Error processing %r event" % event) - else: - self.log.debug("Event handler not found for event : %r" % event) - - def is_subscribed(self): - """ - Checks if this event subscriber is successfully subscribed to the provided topic - :return: True if subscribed, False if otherwise - :rtype: bool - """ - return self.__subscribed - - -from .. util.log import LogFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py deleted file mode 100644 index 202bd35..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py +++ /dev/null @@ -1,184 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -class Tenant: - """ - Object type representing the tenant details of a single tenant - """ - - def __init__(self, tenant_id, tenant_domain): - self.tenant_id = tenant_id - """ :type : int """ - self.tenant_domain = tenant_domain - """ :type : str """ - self.service_name_subscription_map = {} - """ :type : dict[str, Subscription] """ - - def get_subscription(self, service_name): - """ - Returns the Subscription object related to the provided service name - :param str service_name: service name to be retrieved - :return: Subscription of the service or None if the service name doesn't exist - :rtype: Subscription - """ - if service_name in self.service_name_subscription_map: - return self.service_name_subscription_map[service_name] - - return None - - def is_subscribed(self, service_name): - """ - Checks if the given service name has a subscription from this tenant - :param str service_name: name of the service to check - :return: True if the tenant is subscribed to the given service name, False if not - :rtype: bool - """ - return service_name in self.service_name_subscription_map - - def add_subscription(self, subscription): - """ - Adds a subscription information entry on the subscription list for this tenant - :param Subscription subscription: Subscription information to be added - :return: void - :rtype: void - """ - self.service_name_subscription_map[subscription.service_name] = subscription - - def remove_subscription(self, service_name): - """ - Removes the specified subscription details from the subscription list - :param str service_name: The service name of the subscription to be removed - :return: void - :rtype: void - """ - if service_name in self.service_name_subscription_map: - self.service_name_subscription_map.pop(service_name) - - -class Subscription: - """ - Subscription information of a particular subscription to a service - """ - - def __init__(self, service_name, cluster_ids): - self.service_name = service_name - """ :type : str """ - self.cluster_ids = cluster_ids - """ :type : list[str] """ - self.subscription_domain_map = {} - """ :type : dict[str, SubscriptionDomain] """ - - def add_subscription_domain(self, domain_name, application_context): - """ - Adds a subscription domain - :param str domain_name: - :param str application_context: - :return: void - :rtype: void - """ - self.subscription_domain_map[domain_name] = SubscriptionDomain(domain_name, application_context) - - def remove_subscription_domain(self, domain_name): - """ - Removes the subscription domain of the specified domain name - :param str domain_name: - :return: void - :rtype: void - """ - if domain_name in self.subscription_domain_map: - self.subscription_domain_map.pop(domain_name) - - def subscription_domain_exists(self, domain_name): - """ - Returns the SubscriptionDomain information of the specified domain name - :param str domain_name: - :return: SubscriptionDomain - :rtype: SubscriptionDomain - """ - return domain_name in self.subscription_domain_map - - def get_subscription_domains(self): - """ - Returns the list of subscription domains of this subscription - :return: List of SubscriptionDomain objects - :rtype: list[SubscriptionDomain] - """ - return self.subscription_domain_map.values() - - -class SubscriptionDomain: - """ - Represents a Subscription Domain - """ - - def __init__(self, domain_name, application_context): - self.domain_name = domain_name - """ :type : str """ - self.application_context = application_context - """ :type : str """ - - -class TenantContext: - """ - Handles and maintains a model of all the information related to tenants within this instance - """ - tenants = {} - initialized = False - tenant_domains = {"carbon.super": Tenant(-1234, "carbon.super")} - - @staticmethod - def add_tenant(tenant): - TenantContext.tenants[tenant.tenant_id] = tenant - TenantContext.tenant_domains[tenant.tenant_domain] = tenant - - @staticmethod - def remove_tenant(tenant_id): - if tenant_id in TenantContext.tenants: - tenant = TenantContext.get_tenant(tenant_id) - TenantContext.tenants.pop(tenant.tenant_id) - TenantContext.tenant_domains.pop(tenant.tenant_domain) - - @staticmethod - def update(tenants): - for tenant in tenants: - TenantContext.add_tenant(tenant) - - @staticmethod - def get_tenant(tenant_id): - """ - Gets the Tenant object of the provided tenant ID - :param int tenant_id: - :return: Tenant object of the provided tenant ID - :rtype: Tenant - """ - if tenant_id in TenantContext.tenants: - return TenantContext.tenants[tenant_id] - - return None - - @staticmethod - def get_tenant_by_domain(tenant_domain): - """ - Gets the Tenant object of the provided tenant domain - :param str tenant_domain: - :return: Tenant object of the provided tenant domain - :rtype: str - """ - if tenant_domain in TenantContext.tenant_domains: - return TenantContext.tenant_domains[tenant_domain] - - return None \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/c5adb7aa/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py b/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License.
