PCA - Publish MB events using a thread and timeout after 5 seconds. Improved PCA structure and removed unnecessary threading PCA Live Test - Improved logging, improved MB HA test case
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d1912180 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d1912180 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d1912180 Branch: refs/heads/stratos-4.1.x Commit: d19121809154d42998ddc0c85722600f03f7225f Parents: e722ff3 Author: Chamila de Alwis <[email protected]> Authored: Fri Nov 27 14:36:42 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Mon Nov 30 19:13:55 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 62 +- .../modules/event/eventhandler.py | 1181 +++++++++--------- .../cartridge.agent/publisher.py | 72 +- .../cartridge.agent/subscriber.py | 8 +- .../integration/tests/ADCExtensionTestCase.java | 5 + .../tests/ADCMTAppTenantUserTestCase.java | 5 + .../integration/tests/ADCMTAppTestCase.java | 5 + .../agent/integration/tests/ADCTestCase.java | 5 + .../integration/tests/AgentStartupTestCase.java | 5 + .../integration/tests/CEPHAModeTestCase.java | 5 + .../tests/MessageBrokerHATestCase.java | 30 +- .../tests/PythonAgentIntegrationTest.java | 14 +- .../MessageBrokerHATestCase/agent.conf | 2 +- .../src/test/resources/log4j.properties | 2 +- .../src/test/resources/test-suite-all.xml | 1 + .../src/test/resources/test-suite-smoke.xml | 2 + 16 files changed, 734 insertions(+), 670 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 959568b..6b81dff 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -16,22 +16,19 @@ # specific language governing permissions and limitations # under the License. -import threading - import publisher from logpublisher import * from modules.event.application.signup.events import * from modules.event.domain.mapping.events import * -from modules.event.eventhandler import EventHandler +import modules.event.eventhandler as event_handler from modules.event.instance.notifier.events import * from modules.event.tenant.events import * from modules.event.topology.events import * from subscriber import EventSubscriber -class CartridgeAgent(threading.Thread): +class CartridgeAgent(object): def __init__(self): - threading.Thread.__init__(self) Config.initialize_config() self.__tenant_context_initialized = False self.__log_publish_manager = None @@ -47,9 +44,7 @@ class CartridgeAgent(threading.Thread): self.__app_topic_subscriber = EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd) self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd) - self.__event_handler = EventHandler() - - def run(self): + def run_agent(self): self.__log.info("Starting Cartridge Agent...") # Start topology event receiver thread @@ -58,7 +53,7 @@ class CartridgeAgent(threading.Thread): if Config.lvs_virtual_ip is None or str(Config.lvs_virtual_ip).strip() == "": self.__log.debug("LVS Virtual IP is not defined") else: - self.__event_handler.create_dummy_interface() + event_handler.create_dummy_interface() # request complete topology event from CC by publishing CompleteTopologyRequestEvent publisher.publish_complete_topology_request_event() @@ -84,14 +79,14 @@ class CartridgeAgent(threading.Thread): publisher.publish_complete_tenant_request_event() # Execute instance started shell script - self.__event_handler.on_instance_started_event() + event_handler.on_instance_started_event() # Publish instance started event publisher.publish_instance_started_event() # Execute start servers extension try: - self.__event_handler.start_server_extension() + event_handler.start_server_extension() except Exception as e: self.__log.exception("Error processing start servers event: %s" % e) @@ -100,7 +95,7 @@ class CartridgeAgent(threading.Thread): if repo_url is None or str(repo_url).strip() == "": self.__log.info("No artifact repository found") publisher.publish_instance_activated_event() - self.__event_handler.on_instance_activated_event() + event_handler.on_instance_activated_event() else: # instance activated event will be published in artifact updated event handler self.__log.info( @@ -109,7 +104,7 @@ class CartridgeAgent(threading.Thread): persistence_mapping_payload = Config.persistence_mappings if persistence_mapping_payload is not None: - self.__event_handler.volume_mount_extension(persistence_mapping_payload) + event_handler.volume_mount_extension(persistence_mapping_payload) # start log publishing thread if DataPublisherConfiguration.get_instance().enabled: @@ -198,14 +193,14 @@ class CartridgeAgent(threading.Thread): def on_artifact_updated(self, msg): event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload) - self.__event_handler.on_artifact_updated_event(event_obj) + event_handler.on_artifact_updated_event(event_obj) def on_instance_cleanup_member(self, msg): member_in_payload = Config.member_id event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload) member_in_event = event_obj.member_id if member_in_payload == member_in_event: - self.__event_handler.on_instance_cleanup_member_event() + event_handler.on_instance_cleanup_member_event() def on_instance_cleanup_cluster(self, msg): event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload) @@ -215,7 +210,7 @@ class CartridgeAgent(threading.Thread): instance_in_event = event_obj.cluster_instance_id if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event: - self.__event_handler.on_instance_cleanup_cluster_event() + event_handler.on_instance_cleanup_cluster_event() def on_member_created(self, msg): self.__log.debug("Member created event received: %r" % msg.payload) @@ -227,7 +222,7 @@ class CartridgeAgent(threading.Thread): if not TopologyContext.topology.initialized: return - self.__event_handler.on_member_initialized_event(event_obj) + event_handler.on_member_initialized_event(event_obj) def on_member_activated(self, msg): self.__log.debug("Member activated event received: %r" % msg.payload) @@ -235,7 +230,7 @@ class CartridgeAgent(threading.Thread): return event_obj = MemberActivatedEvent.create_from_json(msg.payload) - self.__event_handler.on_member_activated_event(event_obj) + event_handler.on_member_activated_event(event_obj) def on_member_terminated(self, msg): self.__log.debug("Member terminated event received: %r" % msg.payload) @@ -243,7 +238,7 @@ class CartridgeAgent(threading.Thread): return event_obj = MemberTerminatedEvent.create_from_json(msg.payload) - self.__event_handler.on_member_terminated_event(event_obj) + event_handler.on_member_terminated_event(event_obj) def on_member_suspended(self, msg): self.__log.debug("Member suspended event received: %r" % msg.payload) @@ -251,7 +246,7 @@ class CartridgeAgent(threading.Thread): return event_obj = MemberSuspendedEvent.create_from_json(msg.payload) - self.__event_handler.on_member_suspended_event(event_obj) + event_handler.on_member_suspended_event(event_obj) def on_complete_topology(self, msg): event_obj = CompleteTopologyEvent.create_from_json(msg.payload) @@ -259,7 +254,7 @@ class CartridgeAgent(threading.Thread): if not TopologyContext.topology.initialized: self.__log.info("Topology initialized from complete topology event") TopologyContext.topology.initialized = True - self.__event_handler.on_complete_topology_event(event_obj) + event_handler.on_complete_topology_event(event_obj) self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str) @@ -269,17 +264,17 @@ class CartridgeAgent(threading.Thread): return event_obj = MemberStartedEvent.create_from_json(msg.payload) - self.__event_handler.on_member_started_event(event_obj) + event_handler.on_member_started_event(event_obj) def on_domain_mapping_added(self, msg): self.__log.debug("Subscription domain added event received : %r" % msg.payload) event_obj = DomainMappingAddedEvent.create_from_json(msg.payload) - self.__event_handler.on_domain_mapping_added_event(event_obj) + event_handler.on_domain_mapping_added_event(event_obj) def on_domain_mapping_removed(self, msg): self.__log.debug("Subscription domain removed event received : %r" % msg.payload) event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload) - self.__event_handler.on_domain_mapping_removed_event(event_obj) + event_handler.on_domain_mapping_removed_event(event_obj) def on_complete_tenant(self, msg): event_obj = CompleteTenantEvent.create_from_json(msg.payload) @@ -287,19 +282,19 @@ class CartridgeAgent(threading.Thread): if not self.__tenant_context_initialized: self.__log.info("Tenant context initialized from complete tenant event") self.__tenant_context_initialized = True - self.__event_handler.on_complete_tenant_event(event_obj) + event_handler.on_complete_tenant_event(event_obj) self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json) def on_tenant_subscribed(self, msg): self.__log.debug("Tenant subscribed event received: %r" % msg.payload) event_obj = TenantSubscribedEvent.create_from_json(msg.payload) - self.__event_handler.on_tenant_subscribed_event(event_obj) + event_handler.on_tenant_subscribed_event(event_obj) def on_application_signup_removed(self, msg): self.__log.debug("Application signup removed event received: %r" % msg.payload) event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload) - self.__event_handler.on_application_signup_removed_event(event_obj) + event_handler.on_application_signup_removed_event(event_obj) def wait_for_complete_topology(self): while not TopologyContext.topology.initialized: @@ -308,17 +303,12 @@ class CartridgeAgent(threading.Thread): self.__log.info("Complete topology event received") -def main(): - cartridge_agent = CartridgeAgent() +if __name__ == "__main__": log = LogFactory().get_log(__name__) - try: log.info("Starting Stratos cartridge agent...") - cartridge_agent.start() + cartridge_agent = CartridgeAgent() + cartridge_agent.run_agent() except Exception as e: log.exception("Cartridge Agent Exception: %r" % e) - cartridge_agent.terminate() - - -if __name__ == "__main__": - main() + # cartridge_agent.terminate() http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index 6e2aa4f..f8b0c2b 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -21,8 +21,6 @@ from threading import Thread import publisher from entity import * -import constants -from config import Config from ..artifactmgt.git.agentgithandler import * from ..artifactmgt.repository import Repository from ..util import cartridgeagentutils @@ -31,485 +29,492 @@ from ..util.log import LogFactory SUPER_TENANT_ID = "-1234" SUPER_TENANT_REPO_PATH = "/repository/deployment/server/" TENANT_REPO_PATH = "/repository/tenants/" +log = LogFactory().get_log(__name__) + +""" +Event execution related logic +""" + + +def on_instance_started_event(): + log.debug("Processing instance started event...") + # TODO: copy artifacts extension + execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {}) + + +def create_dummy_interface(): + log.debug("Processing lvs dummy interface creation...") + lvs_vip = Config.lvs_virtual_ip.split("|") + log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1])) + execute_event_extendables( + constants.CREATE_LVS_DUMMY_INTERFACE, + {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE, + "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0], + "LVS_SUBNET_MASK": lvs_vip[1]} + ) + + +def on_instance_activated_event(): + log.debug("Processing instance activated event...") + execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {}) + + +def on_artifact_updated_event(artifacts_updated_event): + log.debug( + "Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" + % (str(artifacts_updated_event.tenant_id), artifacts_updated_event.cluster_id, artifacts_updated_event.status)) + + cluster_id_event = str(artifacts_updated_event.cluster_id).strip() + cluster_id_payload = Config.cluster_id + repo_url = str(artifacts_updated_event.repo_url).strip() + + if repo_url == "": + log.error("Repository URL is empty. Failed to process artifact updated event.") + return + + if cluster_id_payload is None or cluster_id_payload == "": + log.error("Cluster ID in payload is empty. Failed to process artifact updated event.") + return + + if cluster_id_payload != cluster_id_event: + log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.") + return + + repo_password = None + if artifacts_updated_event.repo_password is not None: + secret = Config.cartridge_key + repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) + + if Config.app_path is None: + log.error("Repository path is empty. Failed to process artifact updated event.") + return + + repo_username = artifacts_updated_event.repo_username + tenant_id = artifacts_updated_event.tenant_id + is_multitenant = Config.is_multiTenant + commit_enabled = artifacts_updated_event.commit_enabled + + # create repo object + local_repo_path = get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant) + repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled) + log.info("Executing checkout job on artifact updated event...") + + try: + Config.artifact_checkout_plugin.plugin_object.checkout(repo_info) + except Exception as e: + log.exception( + "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e)) + + # execute artifact updated extension + plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id, + "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id, + "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url, + "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password, + "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username, + "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status} + + try: + execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values) + except Exception as e: + log.exception("Could not execute plugins for artifact updated event: %s" % e) + + if not Config.activated: + # publish instance activated event if not yet activated + publisher.publish_instance_activated_event() + on_instance_activated_event() + + update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True) + auto_commit = Config.is_commits_enabled + auto_checkout = Config.is_checkout_enabled + log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s" + % (update_artifacts, auto_commit, auto_checkout)) + + if update_artifacts: + try: + update_interval = int(Config.artifact_update_interval) + except ValueError: + log.exception("Invalid artifact sync interval specified: %s" % ValueError) + update_interval = 10 + + log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval) + + log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off")) + log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off")) + + AgentGitHandler.schedule_artifact_update_task( + repo_info, + auto_checkout, + auto_commit, + update_interval) + + +def on_instance_cleanup_cluster_event(): + log.debug("Processing instance cleanup cluster event...") + cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT) + + +def on_instance_cleanup_member_event(): + log.debug("Processing instance cleanup member event...") + cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT) + + +def on_member_activated_event(member_activated_event): + log.debug( + "Processing Member activated event: [service] %r [cluster] %r [member] %r" + % (member_activated_event.service_name, + member_activated_event.cluster_id, + member_activated_event.member_id)) + + member_initialized = is_member_initialized_in_topology( + member_activated_event.service_name, + member_activated_event.cluster_id, + member_activated_event.member_id) + + if not member_initialized: + log.error("Member has not initialized, failed to execute member activated event") + return + + execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {}) + + +def on_complete_topology_event(complete_topology_event): + log.debug("Processing Complete topology event...") + + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id + + if not Config.initialized: + member_initialized = is_member_initialized_in_topology( + service_name_in_payload, + cluster_id_in_payload, + member_id_in_payload) + if member_initialized: + # Set cartridge agent as initialized since member is available and it is in initialized state + Config.initialized = True + log.info( + "Member initialized [member id] %s, [cluster-id] %s, [service] %s" + % (member_id_in_payload, cluster_id_in_payload, service_name_in_payload)) -class EventHandler: + topology = complete_topology_event.get_topology() + service = topology.get_service(service_name_in_payload) + if service is None: + raise Exception("Service not found in topology [service] %s" % service_name_in_payload) + + cluster = service.get_cluster(cluster_id_in_payload) + if cluster is None: + raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload) + + plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str), + "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)} + + execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values) + + +def on_member_initialized_event(member_initialized_event): """ - Event execution related logic + Member initialized event is sent by cloud controller once volume attachment and + ip address allocation is completed successfully + :param member_initialized_event: + :return: """ + log.debug("Processing Member initialized event...") + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id + + if not Config.initialized and member_id_in_payload == member_initialized_event.member_id: + member_exists = member_exists_in_topology( + service_name_in_payload, + cluster_id_in_payload, + member_id_in_payload) + + log.debug("Member exists: %s" % member_exists) + + if member_exists: + Config.initialized = True + mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) + log.info("Instance marked as initialized on member initialized event") + else: + raise Exception("Member [member-id] %s not found in topology while processing member initialized " + "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology())) - def __init__(self): - self.__log = LogFactory().get_log(__name__) + execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {}) - def on_instance_started_event(self): - self.__log.debug("Processing instance started event...") - # TODO: copy artifacts extension - self.execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {}) - - def create_dummy_interface(self): - self.__log.debug("Processing lvs dummy interface creation...") - lvs_vip = Config.lvs_virtual_ip.split("|") - self.__log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1])) - self.execute_event_extendables(constants.CREATE_LVS_DUMMY_INTERFACE, - {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE, - "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0], - "LVS_SUBNET_MASK": lvs_vip[1]}) - - def on_instance_activated_event(self): - self.__log.debug("Processing instance activated event...") - self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {}) - - def on_artifact_updated_event(self, artifacts_updated_event): - self.__log.debug("Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" % - (str(artifacts_updated_event.tenant_id), - artifacts_updated_event.cluster_id, - artifacts_updated_event.status)) - - cluster_id_event = str(artifacts_updated_event.cluster_id).strip() - cluster_id_payload = Config.cluster_id - repo_url = str(artifacts_updated_event.repo_url).strip() - - if repo_url == "": - self.__log.error("Repository URL is empty. Failed to process artifact updated event.") - return - - if cluster_id_payload is None or cluster_id_payload == "": - self.__log.error("Cluster ID in payload is empty. Failed to process artifact updated event.") - return - - if cluster_id_payload != cluster_id_event: - self.__log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.") - return - - repo_password = None - if artifacts_updated_event.repo_password is not None: - secret = Config.cartridge_key - repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) - - if Config.app_path is None: - self.__log.error("Repository path is empty. Failed to process artifact updated event.") - return - - if not EventHandler.validate_repo_path(Config.app_path): - self.__log.error( - "Repository path cannot be accessed, or is invalid. Failed to process artifact updated event.") - return - - repo_username = artifacts_updated_event.repo_username - tenant_id = artifacts_updated_event.tenant_id - is_multitenant = Config.is_multiTenant - commit_enabled = artifacts_updated_event.commit_enabled - - # create repo object - local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant) - repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled) - self.__log.info("Executing checkout job on artifact updated event...") - try: - Config.artifact_checkout_plugin.plugin_object.checkout(repo_info) - except Exception as e: - self.__log.exception( - "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e)) +def on_complete_tenant_event(complete_tenant_event): + log.debug("Processing Complete tenant event...") - # execute artifact updated extension - plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id, - "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id, - "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url, - "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password, - "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username, - "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status} + tenant_list_json = complete_tenant_event.tenant_list_json + log.debug("Complete tenants:" + json.dumps(tenant_list_json)) - try: - self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values) - except Exception as e: - self.__log.exception("Could not execute plugins for artifact updated event: %s" % e) - - if not Config.activated: - # publish instance activated event if not yet activated - publisher.publish_instance_activated_event() - self.on_instance_activated_event() - - update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True) - auto_commit = Config.is_commits_enabled - auto_checkout = Config.is_checkout_enabled - self.__log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s", - update_artifacts, auto_commit, auto_checkout) - if update_artifacts: - try: - update_interval = int(Config.artifact_update_interval) - except ValueError: - self.__log.exception("Invalid artifact sync interval specified: %s" % ValueError) - update_interval = 10 - - self.__log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval) - - self.__log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off")) - self.__log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off")) - - AgentGitHandler.schedule_artifact_update_task( - repo_info, - auto_checkout, - auto_commit, - update_interval) - - def on_instance_cleanup_cluster_event(self): - self.__log.debug("Processing instance cleanup cluster event...") - self.cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT) - - def on_instance_cleanup_member_event(self): - self.__log.debug("Processing instance cleanup member event...") - self.cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT) - - def on_member_activated_event(self, member_activated_event): - self.__log.debug("Processing Member activated event: [service] %r [cluster] %r [member] %r" - % (member_activated_event.service_name, - member_activated_event.cluster_id, - member_activated_event.member_id)) - - member_initialized = self.is_member_initialized_in_topology( - member_activated_event.service_name, - member_activated_event.cluster_id, - member_activated_event.member_id) - - if not member_initialized: - self.__log.error("Member has not initialized, failed to execute member activated event") - return - - self.execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {}) - - def on_complete_topology_event(self, complete_topology_event): - self.__log.debug("Processing Complete topology event...") - - service_name_in_payload = Config.service_name - cluster_id_in_payload = Config.cluster_id - member_id_in_payload = Config.member_id - - if not Config.initialized: - member_initialized = self.is_member_initialized_in_topology( - service_name_in_payload, - cluster_id_in_payload, - member_id_in_payload) - - if member_initialized: - # Set cartridge agent as initialized since member is available and it is in initialized state - Config.initialized = True - self.__log.info("Member initialized [member id] %s, [cluster-id] %s, [service] %s" % - (member_id_in_payload, cluster_id_in_payload, service_name_in_payload)) - - topology = complete_topology_event.get_topology() - service = topology.get_service(service_name_in_payload) - if service is None: - raise Exception("Service not found in topology [service] %s" % service_name_in_payload) + plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)} - cluster = service.get_cluster(cluster_id_in_payload) - if cluster is None: - raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload) - - plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str), - "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)} - - self.execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values) - - def on_member_initialized_event(self, member_initialized_event): - """ - Member initialized event is sent by cloud controller once volume attachment and - ip address allocation is completed successfully - :return: - """ - self.__log.debug("Processing Member initialized event...") - service_name_in_payload = Config.service_name - cluster_id_in_payload = Config.cluster_id - member_id_in_payload = Config.member_id - - if not Config.initialized and member_id_in_payload == member_initialized_event.member_id: - member_exists = self.member_exists_in_topology(service_name_in_payload, cluster_id_in_payload, - member_id_in_payload) - self.__log.debug("Member exists: %s" % member_exists) - if member_exists: - Config.initialized = True - self.mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) - self.__log.info("Instance marked as initialized on member initialized event") - else: - raise Exception("Member [member-id] %s not found in topology while processing member initialized " - "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology())) + execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values) + + +def on_member_terminated_event(member_terminated_event): + log.debug( + "Processing Member terminated event: [service] %s [cluster] %s [member] %s" + % (member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id)) + + member_initialized = is_member_initialized_in_topology( + member_terminated_event.service_name, + member_terminated_event.cluster_id, + member_terminated_event.member_id + ) + + if not member_initialized: + log.error("Member has not initialized, failed to execute member terminated event") + return + + execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {}) + + +def on_member_suspended_event(member_suspended_event): + log.debug( + "Processing Member suspended event: [service] %s [cluster] %s [member] %s" + % (member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id)) + + member_initialized = is_member_initialized_in_topology( + member_suspended_event.service_name, + member_suspended_event.cluster_id, + member_suspended_event.member_id + ) + + if not member_initialized: + log.error("Member has not initialized, failed to execute member suspended event") + return + + execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {}) + + +def on_member_started_event(member_started_event): + log.debug( + "Processing Member started event: [service] %s [cluster] %s [member] %s" + % (member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id)) + + member_initialized = is_member_initialized_in_topology( + member_started_event.service_name, + member_started_event.cluster_id, + member_started_event.member_id + ) + + if not member_initialized: + log.error("Member has not initialized, failed to execute member started event") + return + + execute_event_extendables(constants.MEMBER_STARTED_EVENT, {}) + + +def start_server_extension(): + log.debug("Processing start server extension...") + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id + member_initialized = is_member_initialized_in_topology( + service_name_in_payload, cluster_id_in_payload, member_id_in_payload) + + if not member_initialized: + log.error("Member has not initialized, failed to execute start server event") + return + + execute_event_extendables("StartServers", {}) + + +def volume_mount_extension(persistence_mappings_payload): + log.debug("Processing volume mount extension...") + execute_event_extendables("VolumeMount", persistence_mappings_payload) + + +def on_domain_mapping_added_event(domain_mapping_added_event): + tenant_domain = find_tenant_domain(domain_mapping_added_event.tenant_id) + log.debug( + "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) + + " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name + + " [application-context] " + domain_mapping_added_event.application_context + ) + + plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id, + "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name, + "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name, + "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id, + "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id), + "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain, + "SUBSCRIPTION_CONTEXT_PATH": + domain_mapping_added_event.context_path} + + execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values) + + +def on_domain_mapping_removed_event(domain_mapping_removed_event): + tenant_domain = find_tenant_domain(domain_mapping_removed_event.tenant_id) + log.info( + "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) + + " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name + ) + + plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id, + "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name, + "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name, + "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id, + "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id), + "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain} + + execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values) + + +def on_copy_artifacts_extension(src, dest): + log.debug("Processing Copy artifacts extension...") + plugin_values = {"SOURCE": src, "DEST": dest} + execute_event_extendables("CopyArtifacts", plugin_values) - self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {}) - - def on_complete_tenant_event(self, complete_tenant_event): - self.__log.debug("Processing Complete tenant event...") - - tenant_list_json = complete_tenant_event.tenant_list_json - self.__log.debug("Complete tenants:" + json.dumps(tenant_list_json)) - - plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)} - - self.execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values) - - def on_member_terminated_event(self, member_terminated_event): - self.__log.debug("Processing Member terminated event: [service] %s [cluster] %s [member] %s" % - (member_terminated_event.service_name, member_terminated_event.cluster_id, - member_terminated_event.member_id)) - - member_initialized = self.is_member_initialized_in_topology( - member_terminated_event.service_name, - member_terminated_event.cluster_id, - member_terminated_event.member_id - ) - - if not member_initialized: - self.__log.error("Member has not initialized, failed to execute member terminated event") - return - - self.execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {}) - - def on_member_suspended_event(self, member_suspended_event): - self.__log.debug("Processing Member suspended event: [service] %s [cluster] %s [member] %s" % - (member_suspended_event.service_name, member_suspended_event.cluster_id, - member_suspended_event.member_id)) - - member_initialized = self.is_member_initialized_in_topology( - member_suspended_event.service_name, - member_suspended_event.cluster_id, - member_suspended_event.member_id - ) - - if not member_initialized: - self.__log.error("Member has not initialized, failed to execute member suspended event") - return - - self.execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {}) - - def on_member_started_event(self, member_started_event): - self.__log.debug("Processing Member started event: [service] %s [cluster] %s [member] %s" % - (member_started_event.service_name, member_started_event.cluster_id, - member_started_event.member_id)) - - member_initialized = self.is_member_initialized_in_topology( - member_started_event.service_name, - member_started_event.cluster_id, - member_started_event.member_id - ) - - if not member_initialized: - self.__log.error("Member has not initialized, failed to execute member started event") - return - - self.execute_event_extendables(constants.MEMBER_STARTED_EVENT, {}) - - def start_server_extension(self): - self.__log.debug("Processing start server extension...") - service_name_in_payload = Config.service_name - cluster_id_in_payload = Config.cluster_id - member_id_in_payload = Config.member_id - member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload, - member_id_in_payload) - - if not member_initialized: - self.__log.error("Member has not initialized, failed to execute start server event") - return - - self.execute_event_extendables("StartServers", {}) - - def volume_mount_extension(self, persistence_mappings_payload): - self.__log.debug("Processing volume mount extension...") - self.execute_event_extendables("VolumeMount", persistence_mappings_payload) - - def on_domain_mapping_added_event(self, domain_mapping_added_event): - tenant_domain = EventHandler.find_tenant_domain(domain_mapping_added_event.tenant_id) - self.__log.debug( - "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) + - " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name + - " [application-context] " + domain_mapping_added_event.application_context - ) - - plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id, - "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name, - "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name, - "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id, - "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id), - "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain, - "SUBSCRIPTION_CONTEXT_PATH": - domain_mapping_added_event.context_path} - - self.execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values) - - def on_domain_mapping_removed_event(self, domain_mapping_removed_event): - tenant_domain = EventHandler.find_tenant_domain(domain_mapping_removed_event.tenant_id) - self.__log.info( - "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) + - " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name - ) - - plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id, - "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name, - "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name, - "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id, - "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id), - "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain} - - self.execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values) - - def on_copy_artifacts_extension(self, src, dest): - self.__log.debug("Processing Copy artifacts extension...") - plugin_values = {"SOURCE": src, "DEST": dest} - self.execute_event_extendables("CopyArtifacts", plugin_values) - - def on_tenant_subscribed_event(self, tenant_subscribed_event): - self.__log.debug( - "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) + - " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids - ) - - self.execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {}) - - def on_application_signup_removed_event(self, application_signup_removal_event): - self.__log.debug( - "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) + - " [application ID] " + str(application_signup_removal_event.applicationId) - ) - - if Config.application_id == application_signup_removal_event.applicationId: - AgentGitHandler.remove_repo(application_signup_removal_event.tenantId) - - self.execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {}) - - def cleanup(self, event): - self.__log.debug("Executing cleanup extension for event %s..." % event) - publisher.publish_maintenance_mode_event() - self.execute_event_extendables("clean", {}) - publisher.publish_instance_ready_to_shutdown_event() - - def execute_event_extendables(self, event, input_values): - """ Execute the extensions and plugins related to the event - :param event: The event name string - :param input_values: the values to be passed to the plugin - :return: - """ - try: - input_values = EventHandler.add_common_input_values(input_values) - except Exception as e: - self.__log.error("Error while adding common input values for event extendables: %s" % e) - input_values["EVENT"] = event - self.__log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values)) - # Execute the extension - self.execute_extension_for_event(event, input_values) - # Execute the plugins - self.execute_plugins_for_event(event, input_values) - - def execute_plugins_for_event(self, event, input_values): - """ For each plugin registered for the specified event, start a plugin execution thread - :param str event: The event name string - :param dict input_values: the values to be passed to the plugin - :return: - """ - try: - plugins_for_event = Config.plugins.get(event) - if plugins_for_event is not None: - for plugin_info in plugins_for_event: - self.__log.debug("Executing plugin %s for event %s" % (plugin_info.name, event)) - plugin_thread = PluginExecutor(plugin_info, input_values) - plugin_thread.start() - - # block till plugin run completes. - plugin_thread.join() - else: - self.__log.debug("No plugins registered for event %s" % event) - except Exception as e: - self.__log.exception("Error while executing plugin for event %s: %s" % (event, e)) - - def execute_extension_for_event(self, event, extension_values): - """ Execute the extension related to the event - :param event: The event name string - :param extension_values: the values to be passed to the plugin - :return: - """ - try: - if Config.extension_executor is not None: - self.__log.debug("Executing extension for event [%s]" % event) - extension_thread = PluginExecutor(Config.extension_executor, extension_values) - extension_thread.start() + +def on_tenant_subscribed_event(tenant_subscribed_event): + log.debug( + "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) + + " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids + ) + + execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {}) + + +def on_application_signup_removed_event(application_signup_removal_event): + log.debug( + "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) + + " [application ID] " + str(application_signup_removal_event.applicationId) + ) + + if Config.application_id == application_signup_removal_event.applicationId: + AgentGitHandler.remove_repo(application_signup_removal_event.tenantId) + + execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {}) + + +def cleanup(event): + log.debug("Executing cleanup extension for event %s..." % event) + publisher.publish_maintenance_mode_event() + execute_event_extendables("clean", {}) + publisher.publish_instance_ready_to_shutdown_event() + + +def execute_event_extendables(event, input_values): + """ Execute the extensions and plugins related to the event + :param event: The event name string + :param input_values: the values to be passed to the plugin + :return: + """ + try: + input_values = add_common_input_values(input_values) + except Exception as e: + log.error("Error while adding common input values for event extendables: %s" % e) + input_values["EVENT"] = event + log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values)) + # Execute the extension + execute_extension_for_event(event, input_values) + # Execute the plugins + execute_plugins_for_event(event, input_values) + + +def execute_plugins_for_event(event, input_values): + """ For each plugin registered for the specified event, start a plugin execution thread + :param str event: The event name string + :param dict input_values: the values to be passed to the plugin + :return: + """ + try: + plugins_for_event = Config.plugins.get(event) + if plugins_for_event is not None: + for plugin_info in plugins_for_event: + log.debug("Executing plugin %s for event %s" % (plugin_info.name, event)) + plugin_thread = PluginExecutor(plugin_info, input_values) + plugin_thread.start() # block till plugin run completes. - extension_thread.join() - else: - self.__log.debug("No extensions registered for event %s" % event) - except OSError as e: - self.__log.warn("No extension was found for event %s: %s" % (event, e)) - except Exception as e: - self.__log.exception("Error while executing extension for event %s: %s" % (event, e)) - - def get_repo_path_for_tenant(self, tenant_id, git_local_repo_path, is_multitenant): - """ Finds the repository path for tenant to clone from the remote repository - :param tenant_id: - :param git_local_repo_path: - :param is_multitenant: - :return: - """ - repo_path = "" - - if is_multitenant: - if tenant_id == SUPER_TENANT_ID: - # super tenant, /repository/deploy/server/ - super_tenant_repo_path = Config.super_tenant_repository_path - # "app_path" - repo_path += git_local_repo_path - - if super_tenant_repo_path is not None and super_tenant_repo_path != "": - super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \ - else "/" + super_tenant_repo_path - super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \ - else super_tenant_repo_path + "/" - # "app_path/repository/deploy/server/" - repo_path += super_tenant_repo_path - else: - # "app_path/repository/deploy/server/" - repo_path += SUPER_TENANT_REPO_PATH + plugin_thread.join() + else: + log.debug("No plugins registered for event %s" % event) + except Exception as e: + log.exception("Error while executing plugin for event %s: %s" % (event, e)) + +def execute_extension_for_event(event, extension_values): + """ Execute the extension related to the event + :param event: The event name string + :param extension_values: the values to be passed to the plugin + :return: + """ + try: + if Config.extension_executor is not None: + log.debug("Executing extension for event [%s]" % event) + extension_thread = PluginExecutor(Config.extension_executor, extension_values) + extension_thread.start() + + # block till plugin run completes. + extension_thread.join() + else: + log.debug("No extensions registered for event %s" % event) + except OSError as e: + log.warn("No extension was found for event %s: %s" % (event, e)) + except Exception as e: + log.exception("Error while executing extension for event %s: %s" % (event, e)) + + +def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant): + """ Finds the repository path for tenant to clone from the remote repository + :param tenant_id: + :param git_local_repo_path: + :param is_multitenant: + :return: + """ + repo_path = "" + + if is_multitenant: + if tenant_id == SUPER_TENANT_ID: + # super tenant, /repository/deploy/server/ + super_tenant_repo_path = Config.super_tenant_repository_path + # "app_path" + repo_path += git_local_repo_path + + if super_tenant_repo_path is not None and super_tenant_repo_path != "": + super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \ + else "/" + super_tenant_repo_path + super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \ + else super_tenant_repo_path + "/" + # "app_path/repository/deploy/server/" + repo_path += super_tenant_repo_path else: - # normal tenant, /repository/tenants/tenant_id - tenant_repo_path = Config.tenant_repository_path - # "app_path" - repo_path += git_local_repo_path - - if tenant_repo_path is not None and tenant_repo_path != "": - tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path - tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/" - # "app_path/repository/tenants/244653444" - repo_path += tenant_repo_path + tenant_id - else: - # "app_path/repository/tenants/244653444" - repo_path += TENANT_REPO_PATH + tenant_id - - # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id - # GitUtils.create_dir(repo_path) + # "app_path/repository/deploy/server/" + repo_path += SUPER_TENANT_REPO_PATH + else: - # not multi tenant, app_path - repo_path = git_local_repo_path - - self.__log.debug("Repo path returned : %r" % repo_path) - return repo_path - - def is_member_initialized_in_topology(self, service_name, cluster_id, member_id): - if self.member_exists_in_topology(service_name, cluster_id, member_id): - topology = TopologyContext.get_topology() - service = topology.get_service(service_name) - if service is None: - raise Exception("Service not found in topology [service] %s" % service_name) - - cluster = service.get_cluster(cluster_id) - if cluster is None: - raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) - - member = cluster.get_member(member_id) - if member is None: - raise Exception("Member id not found in topology [member] %s" % member_id) - - self.__log.info("Found member: " + member.to_json()) - if member.status == MemberStatus.Initialized: - return True - return False + # normal tenant, /repository/tenants/tenant_id + tenant_repo_path = Config.tenant_repository_path + # "app_path" + repo_path += git_local_repo_path + + if tenant_repo_path is not None and tenant_repo_path != "": + tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path + tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/" + # "app_path/repository/tenants/244653444" + repo_path += tenant_repo_path + tenant_id + else: + # "app_path/repository/tenants/244653444" + repo_path += TENANT_REPO_PATH + tenant_id + + # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id + # GitUtils.create_dir(repo_path) + else: + # not multi tenant, app_path + repo_path = git_local_repo_path - def member_exists_in_topology(self, service_name, cluster_id, member_id): + log.debug("Repo path returned : %r" % repo_path) + return repo_path + + +def is_member_initialized_in_topology(service_name, cluster_id, member_id): + if member_exists_in_topology(service_name, cluster_id, member_id): topology = TopologyContext.get_topology() service = topology.get_service(service_name) if service is None: @@ -519,131 +524,149 @@ class EventHandler: if cluster is None: raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) - activated_member = cluster.get_member(member_id) - if activated_member is None: - self.__log.error("Member id not found in topology [member] %s" % member_id) - return False + member = cluster.get_member(member_id) + if member is None: + raise Exception("Member id not found in topology [member] %s" % member_id) - return True + log.info("Found member: " + member.to_json()) + if member.status == MemberStatus.Initialized: + return True + return False - @staticmethod - def mark_member_as_initialized(service_name, cluster_id, member_id): - topology = TopologyContext.get_topology() - service = topology.get_service(service_name) + +def member_exists_in_topology(service_name, cluster_id, member_id): + topology = TopologyContext.get_topology() + service = topology.get_service(service_name) + if service is None: + raise Exception("Service not found in topology [service] %s" % service_name) + + cluster = service.get_cluster(cluster_id) + if cluster is None: + raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) + + activated_member = cluster.get_member(member_id) + if activated_member is None: + log.error("Member id not found in topology [member] %s" % member_id) + return False + + return True + + +def mark_member_as_initialized(service_name, cluster_id, member_id): + topology = TopologyContext.get_topology() + service = topology.get_service(service_name) + if service is None: + raise Exception("Service not found in topology [service] %s" % service_name) + + cluster = service.get_cluster(cluster_id) + if cluster is None: + raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) + + member = cluster.get_member(member_id) + if member is None: + raise Exception("Member id not found in topology [member] %s" % member_id) + member.status = MemberStatus.Initialized + + +def add_common_input_values(plugin_values): + """ + Adds the common parameters to be used by the extension scripts + :param dict[str, str] plugin_values: Dictionary to be added + :return: Dictionary with updated parameters + :rtype: dict[str, str] + """ + if plugin_values is None: + plugin_values = {} + elif type(plugin_values) != dict: + plugin_values = {"VALUE1": str(plugin_values)} + + plugin_values["APPLICATION_PATH"] = Config.app_path + plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False) + plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings + + lb_cluster_id_in_payload = Config.lb_cluster_id + lb_private_ip, lb_public_ip = get_lb_member_ip(lb_cluster_id_in_payload) + plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip + plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip + + topology = TopologyContext.get_topology() + if topology.initialized: + service = topology.get_service(Config.service_name) if service is None: - raise Exception("Service not found in topology [service] %s" % service_name) + raise Exception("Service not found in topology [service] %s" % Config.service_name) - cluster = service.get_cluster(cluster_id) + cluster = service.get_cluster(Config.cluster_id) if cluster is None: - raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) + raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id) - member = cluster.get_member(member_id) + member = cluster.get_member(Config.member_id) if member is None: - raise Exception("Member id not found in topology [member] %s" % member_id) - member.status = MemberStatus.Initialized - - @staticmethod - def add_common_input_values(plugin_values): - """ - Adds the common parameters to be used by the extension scripts - :param dict[str, str] plugin_values: Dictionary to be added - :return: Dictionary with updated parameters - :rtype: dict[str, str] - """ - if plugin_values is None: - plugin_values = {} - elif type(plugin_values) != dict: - plugin_values = {"VALUE1": str(plugin_values)} - - plugin_values["APPLICATION_PATH"] = Config.app_path - plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False) - plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings - - lb_cluster_id_in_payload = Config.lb_cluster_id - lb_private_ip, lb_public_ip = EventHandler.get_lb_member_ip(lb_cluster_id_in_payload) - plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip - plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip + raise Exception("Member id not found in topology [member] %s" % Config.member_id) - topology = TopologyContext.get_topology() - if topology.initialized: - service = topology.get_service(Config.service_name) - if service is None: - raise Exception("Service not found in topology [service] %s" % Config.service_name) - - cluster = service.get_cluster(Config.cluster_id) - if cluster is None: - raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id) - - member = cluster.get_member(Config.member_id) - if member is None: - raise Exception("Member id not found in topology [member] %s" % Config.member_id) - - EventHandler.add_properties(service.properties, plugin_values, "SERVICE_PROPERTY") - EventHandler.add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY") - EventHandler.add_properties(member.properties, plugin_values, "MEMBER_PROPERTY") - - plugin_values.update(Config.get_payload_params()) - - return EventHandler.clean_process_parameters(plugin_values) - - @staticmethod - def add_properties(properties, params, prefix): - """ - Adds the given property list to the parameters list with given prefix in the parameter name - :param dict[str, str] properties: service properties - :param dict[str, str] params: - :param str prefix: - :return: dict[str, str] - """ - if properties is None or properties.items() is None: - return - - for key in properties: - params[prefix + "_" + key] = str(properties[key]) - - @staticmethod - def get_lb_member_ip(lb_cluster_id): - topology = TopologyContext.get_topology() - services = topology.get_services() - - for service in services: - clusters = service.get_clusters() - for cluster in clusters: - members = cluster.get_members() - for member in members: - if member.cluster_id == lb_cluster_id: - return member.member_default_private_ip, member.member_default_public_ip - - return None, None - - @staticmethod - def clean_process_parameters(params): - """ - Removes any null valued parameters before passing them to the extension scripts - :param dict params: - :return: cleaned parameters - :rtype: dict - """ - for key, value in params.items(): - if value is None: - del params[key] - - return params - - @staticmethod - def find_tenant_domain(tenant_id): - tenant = TenantContext.get_tenant(tenant_id) - if tenant is None: - raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id)) - - return tenant.tenant_domain - - @staticmethod - def validate_repo_path(app_path): - # app path would be ex: /var/www, or /opt/server/data - return os.access(app_path, os.W_OK) + add_properties(service.properties, plugin_values, "SERVICE_PROPERTY") + add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY") + add_properties(member.properties, plugin_values, "MEMBER_PROPERTY") + + plugin_values.update(Config.get_payload_params()) + + return clean_process_parameters(plugin_values) + + +def add_properties(properties, params, prefix): + """ + Adds the given property list to the parameters list with given prefix in the parameter name + :param dict[str, str] properties: service properties + :param dict[str, str] params: + :param str prefix: + :return: dict[str, str] + """ + if properties is None or properties.items() is None: + return + + for key in properties: + params[prefix + "_" + key] = str(properties[key]) + + +def get_lb_member_ip(lb_cluster_id): + topology = TopologyContext.get_topology() + services = topology.get_services() + + for service in services: + clusters = service.get_clusters() + for cluster in clusters: + members = cluster.get_members() + for member in members: + if member.cluster_id == lb_cluster_id: + return member.member_default_private_ip, member.member_default_public_ip + + return None, None + + +def clean_process_parameters(params): + """ + Removes any null valued parameters before passing them to the extension scripts + :param dict params: + :return: cleaned parameters + :rtype: dict + """ + for key, value in params.items(): + if value is None: + del params[key] + + return params + + +def find_tenant_domain(tenant_id): + tenant = TenantContext.get_tenant(tenant_id) + if tenant is None: + raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id)) + + return tenant.tenant_domain +def validate_repo_path(app_path): + # app path would be ex: /var/www, or /opt/server/data + return os.access(app_path, os.W_OK) class PluginExecutor(Thread): http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index a24650a..5b6190e 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -15,10 +15,10 @@ # specific language governing permissions and limitations # under the License. -import threading - +from threading import Thread import paho.mqtt.publish as publish import time +from Queue import Queue, Empty import constants import healthstats @@ -201,7 +201,7 @@ def get_publisher(topic): return publishers[topic] -class EventPublisher: +class EventPublisher(object): """ Handles publishing events to topics to the provided message broker """ @@ -210,24 +210,9 @@ class EventPublisher: self.__topic = topic self.__log = LogFactory().get_log(__name__) self.__start_time = int(time.time()) + self.__msg_queue = Queue() def publish(self, event): - publisher_thread = threading.Thread(target=self.__publish_event, args=(event,)) - publisher_thread.start() - - def __publish_event(self, event): - """ - Publishes the given event to the message broker. - - When a list of message brokers are given the event is published to the first message broker - available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ). - - When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded. - This value is set in the agent.conf. - - :param event: - :return: True if the event was published. - """ if Config.mb_username is None: auth = None else: @@ -244,20 +229,59 @@ class EventPublisher: for mb_url in Config.mb_urls: mb_ip, mb_port = mb_url.split(":") + # start a thread to execute publish event + publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload)) + publisher_thread.start() + + # give sometime for the thread to complete + time.sleep(5) + + # check if thread is still running and notify + if publisher_thread.isAlive(): + self.__log.debug( + "Event publishing timed out before succeeding. The message broker could be offline.") + + # check if publish.single() succeeded try: - publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth) - self.__log.debug("Event type: %s published to MB: %s:%s" % (str(event.__class__), mb_ip, mb_port)) + published = self.__msg_queue.get(block=False) + except Empty: + published = False + + if published: return True - except: - self.__log.debug( - "Could not publish event to message broker %s:%s." % (mb_ip, mb_port)) + # All the brokers on the list were offline self.__log.debug( "Could not publish event to any of the provided message brokers. Retrying in %s seconds." % retry_interval) time.sleep(retry_interval) + # Even publisher timeout exceeded self.__log.warn("Could not publish event to any of the provided message brokers before " "the timeout [%s] exceeded. The event will be dropped." % Config.mb_publisher_timeout) return False + + def __publish_event(self, event, mb_ip, mb_port, auth, payload): + """ + Publishes the given event to the message broker. + + When a list of message brokers are given the event is published to the first message broker + available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ). + + When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded. + This value is set in the agent.conf. + + :param event: + :return: True if the event was published. + """ + try: + self.__log.debug("Publishing [event] %s to %s:%s" % (event.__class__.__name__, mb_ip, mb_port)) + publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth) + self.__log.debug("[Event] %s published to MB: %s:%s" % (str(event.__class__.__name__), mb_ip, mb_port)) + self.__msg_queue.put(True) + except Exception as err: + self.__log.debug( + "Could not publish [event] %s to message broker %s:%s. : %s" + % (str(event.__class__.__name__), mb_ip, mb_port, err)) + self.__msg_queue.put(False) http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py index ff5cef9..c5a6d2d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py @@ -178,7 +178,6 @@ class EventExecutor(threading.Thread): def __init__(self, event_queue): threading.Thread.__init__(self) self.__event_queue = event_queue - # TODO: several handlers for one event self.__event_handlers = {} EventSubscriber.log = LogFactory().get_log(__name__) @@ -191,10 +190,9 @@ class EventExecutor(threading.Thread): try: EventSubscriber.log.debug("Executing handler for event %r" % event) handler(event_msg) - except: - EventSubscriber.log.exception("Error processing %r event" % event) + except Exception as err: + EventSubscriber.log.exception("Error processing %r event: %s" % (event, err)) else: - EventSubscriber.log.debug("Event handler not found for event : %r" % event) def register_event_handler(self, event, handler): @@ -226,7 +224,7 @@ class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask): try: self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60) self.__mb_client.disconnect() - except: + except Exception: self.__log.info( "Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port)) self.__connected_client.disconnect() http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java index 2398099..ab4975a 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java @@ -67,6 +67,11 @@ public class ADCExtensionTestCase extends PythonAgentIntegrationTest { startServerSocket(8080); } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + /** * TearDown method for test method testPythonCartridgeAgent */ http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java index 6e40dd6..05d5ba2 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java @@ -70,6 +70,11 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest { startServerSocket(8080); } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + /** * TearDown method for test method testPythonCartridgeAgent */ http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java index 6f0b070..444a5e0 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java @@ -72,6 +72,11 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest { startServerSocket(8080); } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + /** * TearDown method for test method testPythonCartridgeAgent */ http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java index 0dc92be..dba6197 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java @@ -63,6 +63,11 @@ public class ADCTestCase extends PythonAgentIntegrationTest { public ADCTestCase() throws IOException { } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + @BeforeMethod(alwaysRun = true) public void setupADCTest() throws Exception { log.info("Setting up ADCTestCase"); http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java index ea156b6..db21359 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java @@ -79,6 +79,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest { startServerSocket(8080); } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + /** * TearDown method for test method testPythonCartridgeAgent */ http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java index 44d295b..ce13d3f 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java @@ -76,6 +76,11 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest { } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + @BeforeMethod(alwaysRun = true) public void setupCEPHAModeTest() throws Exception { http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java index b1f4d8b..8c72f2d 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java @@ -42,6 +42,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { public MessageBrokerHATestCase() throws IOException { } + @Override + protected String getClassName() { + return this.getClass().getSimpleName(); + } + private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class); private static final int HA_TEST_TIMEOUT = 300000; private static final String CLUSTER_ID = "php.php.domain"; @@ -169,10 +174,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { MEMBER_ID); publishEvent(instanceCleanupMemberEvent); publishCleanupEvent = true; - waitUntilCleanupEventIsReceivedAndStopDefaultMB(); + + stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); } - if (line.contains("Could not publish event to message broker localhost:1885.")) { + if (line.contains("Could not publish [event] ")) { log.info("Event publishing to default message broker failed and the next option is tried."); exit = true; } @@ -186,26 +192,6 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { log.info("MessageBrokerHATestCase publisher test completed successfully."); } - private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() { - boolean eventReceived = false; - List<String> outputLines = new ArrayList<>(); - - while (!eventReceived) { - List<String> newLines = getNewLines(outputLines, outputStream.toString()); - if (newLines.size() > 0) { - for (String line : newLines) { - if (line.contains("Message received: instance/notifier/InstanceCleanupMemberEvent")) { - // take down the default broker - stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); - eventReceived = true; - } - } - } - log.info("Waiting until cleanup event is received by PCA..."); - } - log.info("Cleanup event is received by PCA."); - } - private void assertAgentActivation() { pcaActivated = false; instanceActivated = false;
