http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/test/__init__.py b/tools/python-cartridge-agent/test/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/tools/python-cartridge-agent/test/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License.
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/asynctest.txt ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/test/asynctest.txt b/tools/python-cartridge-agent/test/asynctest.txt deleted file mode 100644 index 623c418..0000000 --- a/tools/python-cartridge-agent/test/asynctest.txt +++ /dev/null @@ -1 +0,0 @@ -1413799652508.8130 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python-cartridge-agent/test/test_util.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/test/test_util.py b/tools/python-cartridge-agent/test/test_util.py deleted file mode 100644 index f62b2e8..0000000 --- a/tools/python-cartridge-agent/test/test_util.py +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from cartridgeagent.modules.util.asyncscheduledtask import * -import time - -def test_async_task(): - test_task = TestTask() - astask = ScheduledExecutor(2, test_task) - start_time = time.time() * 1000 - astask.start() - time.sleep(3) - astask.terminate() - f = open("asynctest.txt", "r") - end_time = float(f.read()) - assert (end_time - start_time) >= 2 * 1000, "Task was executed before specified delay" - - -class TestTask(AbstractAsyncScheduledTask): - - def execute_task(self): - with open("asynctest.txt", "w") as f: - f.seek(0) - f.truncate() - f.write("%1.4f" % (time.time()*1000)) http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/__init__.py b/tools/python_cartridgeagent/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/__init__.py b/tools/python_cartridgeagent/cartridgeagent/__init__.py new file mode 100644 index 0000000..d216be4 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/agent.conf ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/agent.conf b/tools/python_cartridgeagent/cartridgeagent/agent.conf new file mode 100644 index 0000000..5c087e9 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/agent.conf @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[agent] +mb.ip =MB-IP +mb.port =MB-PORT +listen.address =LISTEN_ADDR +thrift.receiver.ip =CEP-IP +thrift.receiver.port =CEP-PORT +thrift.server.admin.username =CEP-ADMIN-USERNAME +thrift.server.admin.password =CEP-ADMIN-PASSWORD +param.file.path =/mnt/cartridge-agent/payload/launch-params +extensions.dir =/mnt/cartridge-agent/extensions +cep.stats.publisher.enabled =ENABLE_HEALTH_PUBLISHER +lb.private.ip =LB_PRIVATE_IP +lb.public.ip =LB_PUBLIC_IP +enable.artifact.update =ENABLE_ARTFCT_UPDATE +auto.commit =COMMIT_ENABLED +auto.checkout =CHECKOUT_ENABLED +artifact.update.interval =ARTFCT_UPDATE_INT +port.check.timeout =PORT_CHECK_TIMEOUT +enable.data.publisher =ENABLE-DATA-PUBLISHER +monitoring.server.ip =MONITORING-SERVER-IP +monitoring.server.port =MONITORING-SERVER-PORT +monitoring.server.secure.port =MONITORING-SERVER-SECURE-PORT +monitoring.server.admin.username =MONITORING-SERVER-ADMIN-USERNAME +monitoring.server.admin.password =MONITORING-SERVER-ADMIN-PASSWORD +log.file.paths =LOG_FILE_PATHS +APP_PATH =APP-PATH +super.tenant.repository.path =/repository/deployment/server/ +tenant.repository.path =/repository/tenants/ +extension.instance.started =instance-started.sh +extension.start.servers =start-servers.sh +extension.instance.activated =instance-activated.sh +extension.artifacts.updated =artifacts-updated.sh +extension.clean =clean.sh +extension.mount.volumes =mount_volumes.sh +extension.member.started =member-started.sh +extension.member.activated =member-activated.sh +extension.member.suspended =member-suspended.sh +extension.member.terminated =member-terminated.sh +extension.complete.topology =complete-topology.sh +extension.complete.tenant =complete-tenant.sh +extension.subscription.domain.added =subscription-domain-added.sh +extension.subscription.domain.removed =subscription-domain-removed.sh +extension.artifacts.copy =artifacts-copy.sh +extension.tenant.subscribed =tenant-subscribed.sh +extension.tenant.unsubscribed =tenant-unsubscribed.sh \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/agent.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/agent.py b/tools/python_cartridgeagent/cartridgeagent/agent.py new file mode 100644 index 0000000..9f1a972 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/agent.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import threading + +from modules.exception.parameternotfoundexception import ParameterNotFoundException +from modules.subscriber import eventsubscriber +from modules.publisher import cartridgeagentpublisher +from modules.event.instance.notifier.events import * +from modules.event.tenant.events import * +from modules.event.topology.events import * +from modules.tenant.tenantcontext import * +from modules.topology.topologycontext import * +from modules.datapublisher.logpublisher import * +from modules.config import cartridgeagentconfiguration +from modules.extensions import defaultextensionhandler + + +class CartridgeAgent(threading.Thread): + extension_handler = defaultextensionhandler.DefaultExtensionHandler() + + def __init__(self): + threading.Thread.__init__(self) + + mb_ip = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP) + mb_port = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT) + + self.__instance_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.INSTANCE_NOTIFIER_TOPIC, + mb_ip, + mb_port) + self.__tenant_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.TENANT_TOPIC, + mb_ip, + mb_port) + self.__topology_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.TOPOLOGY_TOPIC, + mb_ip, + mb_port) + + self.__tenant_context_initialized = False + + self.log_publish_manager = None + + self.terminated = False + + self.log = LogFactory().get_log(__name__) + + self.cartridge_agent_config = CartridgeAgentConfiguration() + + def run(self): + self.log.info("Starting Cartridge Agent...") + + #Check if required prpoerties are set + self.validate_required_properties() + + #Start instance notifier listener thread + self.subscribe_to_topics_and_register_listeners() + + #Start topology event receiver thread + self.register_topology_event_listeners() + + #Start tenant event receiver thread + self.register_tenant_event_listeners() + + #wait for intance spawned event + while not self.cartridge_agent_config.initialized: + self.log.debug("Waiting for Cartridge Agent to be initialized...") + time.sleep(1) + + #Execute instance started shell script + CartridgeAgent.extension_handler.on_instance_started_event() + + #Publish instance started event + cartridgeagentpublisher.publish_instance_started_event() + + #Execute start servers extension + try: + CartridgeAgent.extension_handler.start_server_extension() + except: + self.log.exception("Error processing start servers event") + + #Wait for all ports to be active + cartridgeagentutils.wait_until_ports_active( + self.cartridge_agent_config.listen_address, + self.cartridge_agent_config.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False)) + ) + + # check if artifact management is required before publishing instance activated event + repo_url = self.cartridge_agent_config.repo_url + if repo_url is None or str(repo_url).strip() == "": + self.log.info("No artifact repository found") + CartridgeAgent.extension_handler.on_instance_activated_event() + + cartridgeagentpublisher.publish_instance_activated_event() + + persistence_mappping_payload = self.cartridge_agent_config.persistence_mappings + if persistence_mappping_payload is not None: + CartridgeAgent.extension_handler.volume_mount_extension(persistence_mappping_payload) + + # start log publishing thread + if DataPublisherConfiguration.get_instance().enabled: + log_file_paths = self.cartridge_agent_config.log_file_paths + if log_file_paths is None: + self.log.exception("No valid log file paths found, no logs will be published") + else: + self.log_publish_manager = LogPublisherManager(log_file_paths) + self.log_publish_manager.start() + + while not self.terminated: + time.sleep(1) + + if DataPublisherConfiguration.get_instance().enabled: + self.log_publish_manager.terminate_all_publishers() + + def terminate(self): + """ + Allows the CartridgeAgent thread to be terminated + + :return: void + """ + self.terminated = True + + def validate_required_properties(self): + """ + Checks if required properties are set + :return: void + """ + #PARAM_FILE_PATH + try: + self.cartridge_agent_config.read_property(cartridgeagentconstants.PARAM_FILE_PATH) + except ParameterNotFoundException: + self.log.error("System property not found: %r" % cartridgeagentconstants.PARAM_FILE_PATH) + return + + #EXTENSIONS_DIR + try: + self.cartridge_agent_config.read_property(cartridgeagentconstants.EXTENSIONS_DIR) + except ParameterNotFoundException: + self.log.error("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR) + return + + def subscribe_to_topics_and_register_listeners(self): + self.log.debug("Starting instance notifier event message receiver thread") + + self.__instance_event_subscriber.register_handler("ArtifactUpdatedEvent", self.on_artifact_updated) + self.__instance_event_subscriber.register_handler("InstanceCleanupMemberEvent", self.on_instance_cleanup_member) + self.__instance_event_subscriber.register_handler("InstanceCleanupClusterEvent", self.on_instance_cleanup_cluster) + + self.__instance_event_subscriber.start() + self.log.info("Instance notifier event message receiver thread started") + + # wait till subscribed to continue + while not self.__instance_event_subscriber.is_subscribed(): + time.sleep(2) + + def on_artifact_updated(self, msg): + event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload) + CartridgeAgent.extension_handler.on_artifact_updated_event(event_obj) + + def on_instance_cleanup_member(self, msg): + member_in_payload = self.cartridge_agent_config.member_id + event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload) + member_in_event = event_obj.member_id + if member_in_payload == member_in_event: + CartridgeAgent.extension_handler.on_instance_cleanup_member_event(event_obj) + + def on_instance_cleanup_cluster(self, msg): + event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload) + cluster_in_payload = self.cartridge_agent_config.cluster_id + cluster_in_event = event_obj.cluster_id + + if cluster_in_event == cluster_in_payload: + CartridgeAgent.extension_handler.on_instance_cleanup_cluster_event(event_obj) + + def register_topology_event_listeners(self): + self.log.debug("Starting topology event message receiver thread") + + self.__topology_event_subscriber.register_handler("MemberActivatedEvent", self.on_member_activated) + self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", self.on_member_terminated) + self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended) + self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology) + self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started) + self.__topology_event_subscriber.register_handler("InstanceSpawnedEvent", self.on_instance_spawned) + + self.__topology_event_subscriber.start() + self.log.info("Cartridge Agent topology receiver thread started") + + def on_instance_spawned(self, msg): + self.log.debug("Instance spawned event received: %r" % msg.payload) + if self.cartridge_agent_config.initialized: + return + + event_obj = InstanceSpawnedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_instance_spawned_event(event_obj) + except: + self.log.exception("Error processing instance spawned event") + + def on_member_activated(self, msg): + self.log.debug("Member activated event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + + event_obj = MemberActivatedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_member_activated_event(event_obj) + except: + self.log.exception("Error processing member activated event") + + def on_member_terminated(self, msg): + self.log.debug("Member terminated event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + + event_obj = MemberTerminatedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_member_terminated_event(event_obj) + except: + self.log.exception("Error processing member terminated event") + + def on_member_suspended(self, msg): + self.log.debug("Member suspended event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + + event_obj = MemberSuspendedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_member_suspended_event(event_obj) + except: + self.log.exception("Error processing member suspended event") + + def on_complete_topology(self, msg): + if not self.cartridge_agent_config.initialized: + self.log.debug("Complete topology event received") + event_obj = CompleteTopologyEvent.create_from_json(msg.payload) + TopologyContext.update(event_obj.topology) + try: + CartridgeAgent.extension_handler.on_complete_topology_event(event_obj) + except: + self.log.exception("Error processing complete topology event") + else: + self.log.info("Complete topology event updating task disabled") + + def on_member_started(self, msg): + self.log.debug("Member started event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + + event_obj = MemberStartedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_member_started_event(event_obj) + except: + self.log.exception("Error processing member started event") + + def register_tenant_event_listeners(self): + self.log.debug("Starting tenant event message receiver thread") + self.__tenant_event_subscriber.register_handler("SubscriptionDomainAddedEvent", self.on_subscription_domain_added) + self.__tenant_event_subscriber.register_handler("SubscriptionDomainsRemovedEvent", self.on_subscription_domain_removed) + self.__tenant_event_subscriber.register_handler("CompleteTenantEvent", self.on_complete_tenant) + self.__tenant_event_subscriber.register_handler("TenantSubscribedEvent", self.on_tenant_subscribed) + self.__tenant_event_subscriber.register_handler("TenantUnSubscribedEvent", self.on_tenant_unsubscribed) + + self.__tenant_event_subscriber.start() + self.log.info("Tenant event message receiver thread started") + + def on_subscription_domain_added(self, msg): + self.log.debug("Subscription domain added event received : %r" % msg.payload) + event_obj = SubscriptionDomainAddedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_subscription_domain_added_event(event_obj) + except: + self.log.exception("Error processing subscription domains added event") + + def on_subscription_domain_removed(self, msg): + self.log.debug("Subscription domain removed event received : %r" % msg.payload) + event_obj = SubscriptionDomainRemovedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_subscription_domain_removed_event(event_obj) + except: + self.log.exception("Error processing subscription domains removed event") + + def on_complete_tenant(self, msg): + if not self.__tenant_context_initialized: + self.log.debug("Complete tenant event received") + event_obj = CompleteTenantEvent.create_from_json(msg.payload) + TenantContext.update(event_obj.tenants) + + try: + CartridgeAgent.extension_handler.on_complete_tenant_event(event_obj) + self.__tenant_context_initialized = True + except: + self.log.exception("Error processing complete tenant event") + else: + self.log.info("Complete tenant event updating task disabled") + + def on_tenant_subscribed(self, msg): + self.log.debug("Tenant subscribed event received: %r" % msg.payload) + event_obj = TenantSubscribedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_tenant_subscribed_event(event_obj) + except: + self.log.exception("Error processing tenant subscribed event") + + def on_tenant_unsubscribed(self, msg): + self.log.debug("Tenant unSubscribed event received: %r" % msg.payload) + event_obj = TenantUnsubscribedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_tenant_unsubscribed_event(event_obj) + except: + self.log.exception("Error processing tenant unSubscribed event") + + +def main(): + cartridge_agent = CartridgeAgent() + log = LogFactory().get_log(__name__) + + try: + log.debug("Starting cartridge agent") + cartridge_agent.start() + except: + log.exception("Cartridge Agent Exception") + cartridge_agent.terminate() + + +if __name__ == "__main__": + main() http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/logging.ini ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/logging.ini b/tools/python_cartridgeagent/cartridgeagent/logging.ini new file mode 100644 index 0000000..3e49a96 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/logging.ini @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[formatters] +keys=default + +[formatter_default] +format=%(asctime)s:%(levelname)s:%(message)s +class=logging.Formatter + +[handlers] +keys=console, error_file, log_file + +[handler_console] +class=logging.StreamHandler +formatter=default +args=tuple() + +[handler_log_file] +class=logging.FileHandler +level=LOG_LEVEL +formatter=default +args=("agent.log", "w") + +[handler_error_file] +class=logging.FileHandler +level=ERROR +formatter=default +args=("error.log", "w") + +[loggers] +keys=root + +[logger_root] +level=LOG_LEVEL +formatter=default +handlers=console,error_file,log_file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py new file mode 100644 index 0000000..d216be4 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py new file mode 100644 index 0000000..6da9c58 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/agentgithandler.py @@ -0,0 +1,503 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from threading import current_thread, Thread +from git import * +from gittle import Gittle, GittleAuth # GitPython and Gittle are both used at the time being for pros and cons of both +import urllib2 + +from ... util.log import LogFactory +from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants +from gitrepository import GitRepository +from ... config import cartridgeagentconfiguration +from ... util.asyncscheduledtask import AbstractAsyncScheduledTask, ScheduledExecutor +from ... artifactmgt.repositoryinformation import RepositoryInformation + +class AgentGitHandler: + """ + Handles all the git artifact management tasks related to a cartridge + """ + + log = LogFactory().get_log(__name__) + + SUPER_TENANT_ID = -1234 + SUPER_TENANT_REPO_PATH = "/repository/deployment/server/" + TENANT_REPO_PATH = "/repository/tenants/" + + extension_handler = None + + __git_repositories = {} + # (tenant_id => gitrepository.GitRepository) + + cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration() + + @staticmethod + def checkout(repo_info): + """ + Checks out the code from the remote repository. + If local repository path is empty, a clone operation is done. + If there is a cloned repository already on the local repository path, a pull operation + will be performed. + If there are artifacts not in the repository already on the local repository path, + they will be added to a git repository, the remote url added as origin, and then + a pull operation will be performed. + + :param RepositoryInformation repo_info: The repository information object + :return: A tuple containing whether it was an initial clone or not, and the repository + context object + :rtype: tuple(bool, GitRepository) + """ + repo_context = AgentGitHandler.get_repo_context(repo_info.tenant_id) + if repo_context is not None: + #has been previously cloned, this is not the subscription run + subscribe_run = False + if AgentGitHandler.is_valid_git_repository(repo_context): + AgentGitHandler.log.debug("Existing git repository detected for tenant %r, no clone required" % repo_info.tenant_id) + AgentGitHandler.pull(repo_context) + else: + if not os.listdir(repo_context.local_repo_path): + #empty dir, clone + repo_context.repo = AgentGitHandler.clone(repo_info) + else: + #not empty + if AgentGitHandler.sync_initial_local_artifacts(repo_context): + AgentGitHandler.pull(repo_context) + else: + repo_context = None + else: + #subscribing run.. need to clone + subscribe_run = True + repo_context = AgentGitHandler.clone(repo_context) + + return subscribe_run, repo_context + + @staticmethod + def sync_initial_local_artifacts(repo_context): + #init git repo + AgentGitHandler.init(repo_context.local_repo_path) + + # add remote repos + return AgentGitHandler.add_remote(repo_context) + + @staticmethod + def add_remote(repo_context): + try: + #add origin remote + repo_context.repo.create_remote("origin", repo_context.repo_url) + #fetch branch details from origin + repo_context.repo.git.fetch() + #checkout master branch from origin/master as tracking + repo_context.repo.git.branch("-f", "--track", "master", "origin/master") + return True + except: + AgentGitHandler.log.exception("Error in adding remote origin %r for local repository %r" + % (repo_context.repo_url, repo_context.local_repo_path)) + return False + + @staticmethod + def init(path): + try: + repo = Gittle.init(path) + return repo + except: + AgentGitHandler.log.exception("Initializing local repo at %r failed" % path) + raise Exception("Initializing local repo at %r failed" % path) + + @staticmethod + def is_valid_git_repository(repo_context): + if repo_context.cloned: + return True + + for ref in repo_context.repo.refs: + try: + ref._get_object() + except ValueError: + return False + + return True + + @staticmethod + def pull(repo_context): + repo = Repo(repo_context.local_repo_path) + from ....agent import CartridgeAgent + AgentGitHandler.extension_handler = CartridgeAgent.extension_handler + try: + repo.git.checkout("master") + pull_output = repo.git.pull() + if "Already up-to-date." not in pull_output: + AgentGitHandler.log.debug("Artifacts were updated as a result of the pull operation, thread: %r - %r" % (current_thread().getName(), current_thread().ident)) + + AgentGitHandler.extension_handler.on_artifact_update_scheduler_event(repo_context.tenant_id) + except GitCommandError as ex: + if "fatal: Could not read from remote repository." in ex: + #invalid configuration, need to delete and reclone + AgentGitHandler.log.warn("Git pull unsuccessful for tenant %r, invalid configuration. %r" % (repo_context.tenant_id, ex)) + cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path) + AgentGitHandler.clone(RepositoryInformation( + repo_context.repo_url, + repo_context.repo_username, + repo_context.repo_password, + repo_context.local_repo_path, + repo_context.tenant_id, + repo_context.is_multitenant, + repo_context.commit_enabled + )) + AgentGitHandler.extension_handler.on_artifact_update_scheduler_event(repo_context.tenant_id) + elif "error: Your local changes to the following files would be overwritten by merge:" in ex: + #conflict error + AgentGitHandler.log.warn("Git pull unsuccessful for tenant %r, conflicts detected." % repo_context.tenant_id) + #raise ex + + """ + 0:'git pull' returned exit status 1: error: Your local changes to the following files would be overwritten by merge: + 1: README.md + 2: index.php + 3:Please, commit your changes or stash them before you can merge. + 4:Aborting + """ + conflict_list = [] + files_arr = str(ex).split("\n") + for file_index in range(1, len(files_arr) - 2): + file_name = files_arr[file_index].strip() + conflict_list.append(file_name) + AgentGitHandler.log.debug("Added the file path %r to checkout from the remote repository" % file_name) + + AgentGitHandler.checkout_individually(conflict_list, repo) + elif "fatal: unable to access " in ex: + #transport error + AgentGitHandler.log.exception("Accessing remote git repository %r failed for tenant %r" % (repo_context.repo_url, repo_context.tenant_id)) + else: + AgentGitHandler.log.exception("Git pull operation for tenant %r failed" % repo_context.tenant_id) + + @staticmethod + def checkout_individually(conflict_list, repo): + try: + for conflicted_file in conflict_list: + repo.git.checkout(conflicted_file) + AgentGitHandler.log.info("Checked out the conflicting files from the remote repository successfully") + except: + AgentGitHandler.log.exception("Checking out artifacts from index failed") + + @staticmethod + def clone(repo_info): + repo_context = None + try: + repo_context = AgentGitHandler.create_git_repo_context(repo_info) + #create the directory if it doesn't exist + if not os.path.isdir(repo_context.local_repo_path): + cartridgeagentutils.create_dir(repo_context.local_repo_path) + + auth = AgentGitHandler.create_auth_configuration(repo_context) + + if auth is not None: + # authentication is required, use Gittle + gittle_repo = Gittle.clone(repo_context.repo_url, repo_context.local_repo_path, auth=auth) + repo = Repo(repo_context.local_repo_path) + else: + # authentication is not required, use GitPython + repo = Repo.clone_from(repo_context.repo_url, repo_context.local_repo_path) + gittle_repo = Gittle(repo_context.local_repo_path) + + repo_context.cloned = True + repo_context.gittle_repo = gittle_repo + repo_context.repo = repo + AgentGitHandler.add_repo_context(repo_context) + AgentGitHandler.log.info("Git clone operation for tenant %r successful" % repo_context.tenant_id) + except urllib2.URLError: + AgentGitHandler.log.exception("Accessing remote git repository failed for tenant %r" % repo_context.tenant_id) + except OSError: + AgentGitHandler.log.exception("Permission denied for repository path for tenant %r" % repo_context.tenant_id) + except: + AgentGitHandler.log.exception("Git clone operation for tenant %r failed" % repo_context.tenant_id) + finally: + return repo_context + + @staticmethod + def create_auth_configuration(repo_context): + """ + Creates a GittleAuth object based on the type of authorization + :param GitRepository repo_context: The repository context object + :return: GittleAuth object or None if no authorization needed + :rtype: GittleAuth + """ + if repo_context.key_based_auth: + pkey = AgentGitHandler.get_private_key() + auth = GittleAuth(pkey=pkey) + elif repo_context.repo_username.strip() != "" and repo_context.repo_password.strip() != "": + auth = GittleAuth(username=repo_context.repo_username, password=repo_context.repo_password) + else: + auth = None + + return auth + + @staticmethod + def get_private_key(): + """ + Returns a file handler to the private key path specified by Carbon or default if not specified + by Carbon + :return: The file object of the private key file + :rtype: file + """ + pkey_name = cartridgeagentutils.get_carbon_server_property("SshPrivateKeyName") + if pkey_name is None: + pkey_name = "wso2" + + pkey_path = cartridgeagentutils.get_carbon_server_property("SshPrivateKeyPath") + if pkey_path is None: + pkey_path = os.environ["HOME"] + "/.ssh" + + if pkey_path.endswith("/"): + pkey_ptr = pkey_path + pkey_name + else: + pkey_ptr = pkey_path + "/" + pkey_name + + pkey_file = open(pkey_ptr) + + return pkey_file + + + @staticmethod + def add_repo_context(repo_context): + AgentGitHandler.__git_repositories[repo_context.tenant_id] = repo_context + + @staticmethod + def get_repo_context(tenant_id): + """ + + :param int tenant_id: + :return: GitRepository object + :rtype: GitRepository + """ + if tenant_id in AgentGitHandler.__git_repositories: + return AgentGitHandler.__git_repositories[tenant_id] + + return None + + @staticmethod + def remove_repo_context(tenant_id): + if tenant_id in AgentGitHandler.__git_repositories: + del AgentGitHandler.__git_repositories[tenant_id] + + @staticmethod + def create_git_repo_context(repo_info): + repo_context = GitRepository() + repo_context.tenant_id = repo_info.tenant_id + repo_context.local_repo_path = AgentGitHandler.get_repo_path_for_tenant( + repo_info.tenant_id, repo_info.repo_path, repo_info.is_multitenant) + repo_context.repo_url = repo_info.repo_url + repo_context.repo_username = repo_info.repo_username + repo_context.repo_password = repo_info.repo_password + repo_context.is_multitenant = repo_info.is_multitenant + repo_context.commit_enabled = repo_info.commit_enabled + + if AgentGitHandler.is_key_based_auth(repo_info.repo_url, repo_info.tenant_id): + repo_context.key_based_auth = True + else: + repo_context.key_based_auth = False + + repo_context.cloned = False + + repo_context.repo = None + repo_context.gittle_repo = None + + return repo_context + + @staticmethod + def is_key_based_auth(repo_url, tenant_id): + """ + Checks if the given git repo has key based authentication + :param str repo_url: Git repository remote url + :param str tenant_id: Tenant ID + :return: True if key based, False otherwise + :rtype: bool + """ + if repo_url.startswith("http://") or repo_url.startswith("https://"): + # username and password, not key based + return False + elif repo_url.startswith("git://github.com"): + # no auth required + return False + elif repo_url.startswith("ssh://") or "@" in repo_url: + # key based + return True + else: + AgentGitHandler.log.error("Invalid git URL provided for tenant " + tenant_id) + raise RuntimeError("Invalid git URL provided for tenant " + tenant_id) + + @staticmethod + def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant): + repo_path = "" + + if is_multitenant: + if tenant_id == AgentGitHandler.SUPER_TENANT_ID: + #super tenant, /repository/deploy/server/ + super_tenant_repo_path = AgentGitHandler.cartridge_agent_config.super_tenant_repository_path + #"app_path" + repo_path += git_local_repo_path + + if super_tenant_repo_path is not None and super_tenant_repo_path != "": + super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") else "/" + super_tenant_repo_path + super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") else super_tenant_repo_path + "/" + #"app_path/repository/deploy/server/" + repo_path += super_tenant_repo_path + else: + #"app_path/repository/deploy/server/" + repo_path += AgentGitHandler.SUPER_TENANT_REPO_PATH + + else: + #normal tenant, /repository/tenants/tenant_id + tenant_repo_path = AgentGitHandler.cartridge_agent_config.tenant_repository_path + #"app_path" + repo_path += git_local_repo_path + + if tenant_repo_path is not None and tenant_repo_path != "": + tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path + tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/" + #"app_path/repository/tenants/244653444" + repo_path += tenant_repo_path + tenant_id + else: + #"app_path/repository/tenants/244653444" + repo_path += AgentGitHandler.TENANT_REPO_PATH + tenant_id + + #tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id + cartridgeagentutils.create_dir(repo_path) + else: + #not multi tenant, app_path + repo_path = git_local_repo_path + + AgentGitHandler.log.debug("Repo path returned : %r" % repo_path) + return repo_path + + @staticmethod + def commit(repo_info): + """ + Commits and pushes new artifacts to the remote repository + :param repo_info: + :return: + """ + tenant_id = repo_info.tenant_id + repo_context = AgentGitHandler.get_repo_context(tenant_id) + gittle_repo = repo_context.gittle_repo + try: + modified = True if gittle_repo.modified_unstaged_files.count > 0 else False + except OSError: + # removed files + modified = True + + if not modified: + AgentGitHandler.log.debug("No changes detected in the local repository for tenant " + tenant_id) + return + + gittle_repo.stage(gittle_repo.untracked_files) + gittle_repo.stage(gittle_repo.removed_files) + gittle_repo.stage(gittle_repo.modified_unstaged_files) + + #commit to local repositpory + commit_message = "tenant " + tenant_id + "'s artifacts committed to local repo at " + repo_context.local_repo_path + + try: + commit_hash = gittle_repo.commit(name="First Author", email="[email protected]", message=commit_message) + AgentGitHandler.log.debug("Committed artifacts for tenant : " + tenant_id + " : " + commit_hash) + except: + AgentGitHandler.log.exception("Committing artifacts to local repository failed for tenant " + tenant_id) + + #push to remote + try: + repo = repo_context.repo + #TODO: check key based authentication + credentialed_remote_url = AgentGitHandler.get_credentialed_remote_url(repo_context) + push_remote = repo.create_remote('push_remote', credentialed_remote_url) + push_remote.push() + AgentGitHandler.log.debug("Pushed artifacts for tenant : " + tenant_id) + except: + AgentGitHandler.log.exception("Pushing artifacts to remote repository failed for tenant " + tenant_id) + + @staticmethod + def get_credentialed_remote_url(repo_context): + """ + Creates a remote url including the credentials + :param repo_context: + :return: + """ + username = repo_context.repo_username + password = repo_context.repo_password + + raise NotImplementedError + + @staticmethod + def schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, update_interval): + repo_context = AgentGitHandler.get_repo_context(repo_info.tenant_id) + + if repo_context is None: + AgentGitHandler.log.error("Unable to schedule artifact sync task, repositoryContext null for tenant %r" % repo_info.tenant_id) + return + + if repo_context.scheduled_update_task is None: + #TODO: make thread safe + artifact_update_task = ArtifactUpdateTask(repo_info, auto_checkout, auto_commit) + async_task = ScheduledExecutor(update_interval, artifact_update_task) + + repo_context.scheduled_update_task = async_task + async_task.start() + AgentGitHandler.log.info("Scheduled Artifact Synchronization Task for path %r" % repo_context.local_repo_path) + else: + AgentGitHandler.log.info("Artifact Synchronization Task for path %r already scheduled" % repo_context.local_repo_path) + + @staticmethod + def remove_repo(tenant_id): + repo_context = AgentGitHandler.get_repo_context(tenant_id) + + #stop artifact update task + repo_context.scheduled_update_task.terminate() + + #remove git contents + cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path) + + AgentGitHandler.remove_repo_context(tenant_id) + + if tenant_id == -1234: + if AgentGitHandler.cartridge_agent_config.is_multitenant: + extensionutils.execute_copy_artifact_extension( + cartridgeagentconstants.SUPERTENANT_TEMP_PATH, + AgentGitHandler.cartridge_agent_config.app_path + "/repository/deployment/server/" + ) + + AgentGitHandler.log.info("git repository deleted for tenant %r" % repo_context.tenant_id) + + return True + + +class ArtifactUpdateTask(AbstractAsyncScheduledTask): + """ + Checks if the autocheckout and autocommit are enabled and executes respective tasks + """ + + def __init__(self, repo_info, auto_checkout, auto_commit): + self.log = LogFactory().get_log(__name__) + self.repo_info = repo_info + self.auto_checkout = auto_checkout + self.auto_commit = auto_commit + + def execute_task(self): + try: + if self.auto_checkout: + AgentGitHandler.checkout(self.repo_info) + except: + self.log.exception("Auto checkout task failed") + + if self.auto_commit: + AgentGitHandler.commit(self.repo_info) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py new file mode 100644 index 0000000..98a8a44 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/git/gitrepository.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ...util.asyncscheduledtask import AsyncScheduledTask +from gittle import Gittle +from git import * + +class GitRepository: + """ + Represents a git repository inside a particular instance + """ + + def __init__(self): + self.repo_url = None + """ :type : str """ + self.local_repo_path = None + """ :type : str """ + self.cloned = False + """ :type : bool """ + self.repo = None + """ :type : git.repo.base.Repo """ + self.gittle_repo = None + """ :type : gittle.gittle.Gittle """ + self.tenant_id = None + """ :type : int """ + self.key_based_auth = False + """ :type : bool """ + self.repo_username = None + """ :type : str """ + self.repo_password = None + """ :type : str """ + self.is_multitenant = False + """ :type : bool """ + self.commit_enabled = False + """ :type : bool """ + self.scheduled_update_task = None + """:type : AsyncScheduledTask """ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py new file mode 100644 index 0000000..b67eada --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/artifactmgt/repositoryinformation.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class RepositoryInformation: + """ + Holds repository information to be used in artifact management + """ + + def __init__(self, repo_url, repo_username, repo_password, repo_path, tenant_id, is_multitenant, commit_enabled): + self.repo_url = repo_url + """ :type : str """ + self.repo_username = repo_username + """ :type : str """ + self.repo_password = repo_password + """ :type : str """ + self.repo_path = repo_path + """ :type : str """ + self.tenant_id = tenant_id + """ :type : int """ + self.is_multitenant = is_multitenant + """ :type : bool """ + self.commit_enabled = commit_enabled + """ :type : bool """ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/config/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py b/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py new file mode 100644 index 0000000..15871ba --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/config/cartridgeagentconfiguration.py @@ -0,0 +1,346 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import ConfigParser +import os +import socket + +from ..util.log import LogFactory + + +class CartridgeAgentConfiguration: + """ + Handles the configuration information of the particular Cartridge Agent + """ + class __CartridgeAgentConfiguration: + def __init__(self): + # set log level + self.log = LogFactory().get_log(__name__) + + self.payload_params = {} + self.properties = None + + self.service_group = None + """ :type : str """ + self.is_clustered = False + """ :type : bool """ + self.service_name = None + """ :type : str """ + self.cluster_id = None + """ :type : str """ + self.network_partition_id = None + """ :type : str """ + self.partition_id = None + """ :type : str """ + self.member_id = None + """ :type : str """ + self.cartridge_key = None + """ :type : str """ + self.app_path = None + """ :type : str """ + self.repo_url = None + """ :type : str """ + self.ports = [] + """ :type : list[str] """ + self.log_file_paths = [] + """ :type : list[str] """ + self.is_multitenant = False + """ :type : bool """ + self.persistence_mappings = None + """ :type : str """ + self.is_commits_enabled = False + """ :type : bool """ + self.is_checkout_enabled = False + """ :type : bool """ + self.listen_address = None + """ :type : str """ + self.is_internal_repo = False + """ :type : bool """ + self.tenant_id = None + """ :type : str """ + self.lb_cluster_id = None + """ :type : str """ + self.min_count = None + """ :type : str """ + self.lb_private_ip = None + """ :type : str """ + self.lb_public_ip = None + """ :type : str """ + self.tenant_repository_path = None + """ :type : str """ + self.super_tenant_repository_path = None + """ :type : str """ + self.deployment = None + """ :type : str """ + self.manager_service_name = None + """ :type : str """ + self.worker_service_name = None + """ :type : str """ + self.is_primary = False + """ :type : bool """ + + self.payload_params = {} + self.__read_conf_file() + self.__read_parameter_file() + + self.initialized = False + """ :type : bool """ + + try: + self.service_group = self.payload_params[cartridgeagentconstants.SERVICE_GROUP] \ + if cartridgeagentconstants.SERVICE_GROUP in self.payload_params \ + else None + + if cartridgeagentconstants.CLUSTERING in self.payload_params and \ + str(self.payload_params[cartridgeagentconstants.CLUSTERING]).strip().lower() == "true": + self.is_clustered = True + else: + self.is_clustered = False + # self.__isClustered = self.payload_params[ + # cartridgeagentconstants.CLUSTERING] if cartridgeagentconstants.CLUSTERING in self.payload_params else None + + self.service_name = self.read_property(cartridgeagentconstants.SERVICE_NAME) + self.cluster_id = self.read_property(cartridgeagentconstants.CLUSTER_ID) + self.network_partition_id = self.read_property(cartridgeagentconstants.NETWORK_PARTITION_ID, False) + self.partition_id = self.read_property(cartridgeagentconstants.PARTITION_ID, False) + self.member_id = self.get_member_id(cartridgeagentconstants.MEMBER_ID) + self.cartridge_key = self.read_property(cartridgeagentconstants.CARTRIDGE_KEY) + self.app_path = self.read_property(cartridgeagentconstants.APP_PATH, False) + self.repo_url = self.read_property(cartridgeagentconstants.REPO_URL, False) + self.ports = str(self.read_property(cartridgeagentconstants.PORTS)).split("|") + + try: + self.log_file_paths = str( + self.read_property(cartridgeagentconstants.LOG_FILE_PATHS)).strip().split("|") + except ParameterNotFoundException as ex: + self.log.debug("Cannot read log file path : %r" % ex.get_message()) + self.log_file_paths = None + + is_multi_str = self.read_property(cartridgeagentconstants.MULTITENANT) + self.is_multitenant = True if str(is_multi_str).lower().strip() == "true" else False + + try: + self.persistence_mappings = self.read_property( + cartridgeagentconstants.PERSISTENCE_MAPPING) + except ParameterNotFoundException as ex: + self.log.debug("Cannot read persistence mapping : %r" % ex.get_message()) + self.persistence_mappings = None + + try: + is_commit_str = self.read_property(cartridgeagentconstants.COMMIT_ENABLED) + self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False + except ParameterNotFoundException: + try: + is_commit_str = self.read_property(cartridgeagentconstants.AUTO_COMMIT) + self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False + except ParameterNotFoundException: + self.log.info( + "%r is not found and setting it to false" % cartridgeagentconstants.COMMIT_ENABLED) + self.is_commits_enabled = False + + auto_checkout_str = self.read_property(cartridgeagentconstants.AUTO_CHECKOUT, False) + self.is_checkout_enabled = True if str(auto_checkout_str).lower().strip() == "true" else False + + self.listen_address = self.read_property( + cartridgeagentconstants.LISTEN_ADDRESS, False) + + try: + int_repo_str = self.read_property(cartridgeagentconstants.PROVIDER) + self.is_internal_repo = True if str(int_repo_str).strip().lower() == cartridgeagentconstants.INTERNAL else False + except ParameterNotFoundException: + self.log.info(" INTERNAL payload parameter is not found") + self.is_internal_repo = False + + self.tenant_id = self.read_property(cartridgeagentconstants.TENANT_ID) + self.lb_cluster_id = self.read_property(cartridgeagentconstants.LB_CLUSTER_ID, False) + self.min_count = self.read_property(cartridgeagentconstants.MIN_INSTANCE_COUNT, False) + self.lb_private_ip = self.read_property(cartridgeagentconstants.LB_PRIVATE_IP, False) + self.lb_public_ip = self.read_property(cartridgeagentconstants.LB_PUBLIC_IP, False) + self.tenant_repository_path = self.read_property(cartridgeagentconstants.TENANT_REPO_PATH, False) + self.super_tenant_repository_path = self.read_property(cartridgeagentconstants.SUPER_TENANT_REPO_PATH, False) + + try: + self.deployment = self.read_property( + cartridgeagentconstants.DEPLOYMENT) + except ParameterNotFoundException: + self.deployment = None + + # Setting worker-manager setup - manager service name + if self.deployment is None: + self.manager_service_name = None + + if str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): + self.manager_service_name = self.service_name + + elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + self.deployment = self.read_property( + cartridgeagentconstants.MANAGER_SERVICE_TYPE) + + elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): + self.deployment = None + else: + self.deployment = None + + # Setting worker-manager setup - worker service name + if self.deployment is None: + self.worker_service_name = None + + if str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + self.manager_service_name = self.service_name + + elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): + self.deployment = self.read_property( + cartridgeagentconstants.WORKER_SERVICE_TYPE) + + elif str(self.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): + self.deployment = None + else: + self.deployment = None + + try: + self.is_primary = self.read_property( + cartridgeagentconstants.CLUSTERING_PRIMARY_KEY) + except ParameterNotFoundException: + self.is_primary = None + except ParameterNotFoundException as ex: + raise RuntimeError(ex) + + self.log.info("Cartridge agent configuration initialized") + + self.log.debug("service-name: %r" % self.service_name) + self.log.debug("cluster-id: %r" % self.cluster_id) + self.log.debug( + "network-partition-id: %r" % self.network_partition_id) + self.log.debug("partition-id: %r" % self.partition_id) + self.log.debug("member-id: %r" % self.member_id) + self.log.debug("cartridge-key: %r" % self.cartridge_key) + self.log.debug("app-path: %r" % self.app_path) + self.log.debug("repo-url: %r" % self.repo_url) + self.log.debug("ports: %r" % str(self.ports)) + self.log.debug("lb-private-ip: %r" % self.lb_private_ip) + self.log.debug("lb-public-ip: %r" % self.lb_public_ip) + + def get_member_id(self, member_id_field): + """ + Reads the member id from the payload file or configuration file. If neither of + these sources contain the member id, the hostname is assigned to it and returned. + :param str member_id_field: the key of the member id to lookup + :return: The member id + :rtype : str + """ + try: + member_id = self.read_property(member_id_field) + except ParameterNotFoundException: + try: + self.log.info("Reading hostname from container") + member_id = socket.gethostname() + except: + self.log.exception("Hostname can not be resolved") + member_id = "unknown" + + self.log.debug("MemberId is taking the value of hostname : [" + member_id + "] ") + return member_id + + def __read_conf_file(self): + """ + Reads and stores the agent's configuration file + :return: void + """ + + conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "agent.conf" + self.log.debug("Config file path : %r" % conf_file_path) + self.properties = ConfigParser.SafeConfigParser() + self.properties.read(conf_file_path) + + def __read_parameter_file(self): + """ + Reads the payload file of the cartridge and stores the values in a dictionary + :return: void + """ + + param_file = self.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False) + self.log.debug("Param file path : %r" % param_file) + + try: + if param_file is not None: + metadata_file = open(param_file) + metadata_payload_content = metadata_file.read() + for param in metadata_payload_content.split(","): + if param.strip() != "": + param_value = param.strip().split("=") + self.payload_params[param_value[0]] = param_value[1] + + # self.payload_params = dict( + # param.split("=") for param in metadata_payload_content.split(",")) + metadata_file.close() + else: + self.log.error("File not found: %r" % param_file) + except: + self.log.exception( + "Could not read launch parameter file, hence trying to read from System properties.") + + def read_property(self, property_key, critical=True): + """ + Returns the value of the provided property + :param str property_key: the name of the property to be read + :return: Value of the property, + :rtype: str + :exception: ParameterNotFoundException if the provided property cannot be found + """ + + if self.properties.has_option("agent", property_key): + self.log.debug("Has key: %r" % property_key) + temp_str = self.properties.get("agent", property_key) + if temp_str != "" and temp_str is not None: + if str(temp_str).strip().lower() == "null": + return "" + else: + return str(temp_str).strip() + + if property_key in self.payload_params: + temp_str = self.payload_params[property_key] + if temp_str != "" and temp_str is not None: + if str(temp_str).strip().lower() == "null": + return "" + else: + return str(temp_str).strip() + + if critical: + raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key) + + instance = None + """ :type : __CartridgeAgentConfiguration""" + + # def __new__(cls, *args, **kwargs): + # if not CartridgeAgentConfiguration.instance: + # CartridgeAgentConfiguration.instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration() + # + # return CartridgeAgentConfiguration.instance + + def __init__(self): + if not CartridgeAgentConfiguration.instance: + CartridgeAgentConfiguration.instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration() + + def __getattr__(self, name): + return getattr(self.instance, name) + + def __setattr__(self, name, value): + return setattr(self.instance, name, value) + + +from ..exception.parameternotfoundexception import ParameterNotFoundException +from ..util import cartridgeagentconstants http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py new file mode 100644 index 0000000..1859d8a --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py @@ -0,0 +1,202 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from thrift.publisher import * +from ..util.log import * + + +class StreamDefinition: + """ + Represents a BAM/CEP stream definition + """ + + STRING = 'STRING' + DOUBLE = 'DOUBLE' + INT = 'INT' + LONG = 'LONG' + BOOL = 'BOOL' + + def __init__(self): + self.name = None + """:type : str""" + self.version = None + """:type : str""" + self.nickname = None + """:type : str""" + self.description = None + """:type : str""" + self.meta_data = [] + """:type : list[str]""" + self.correlation_data = [] + """:type : list[str]""" + self.payload_data = [] + """:type : list[str]""" + self.stream_id = None + """ :type : str """ + + def add_metadata_attribute(self, attr_name, attr_type): + self.meta_data.append({"name": attr_name, "type": attr_type}) + + def add_payloaddata_attribute(self, attr_name, attr_type): + self.payload_data.append({"name": attr_name, "type": attr_type}) + + def add_correlationdata_attribute(self, attr_name, attr_type): + self.correlation_data.append({"name": attr_name, "type": attr_type}) + + def __str__(self): + """ + To string override + """ + + json_str = "{" + json_str += "\"name\":\"" + self.name + "\"," + json_str += "\"version\":\"" + self.version + "\"," + json_str += "\"nickName\":\"" + self.nickname + "\"," + json_str += "\"description\":\"" + self.description + "\"," + + # add metadata attributes if exists + if len(self.meta_data) > 0: + json_str += "\"metaData\":[" + for metadatum in self.meta_data: + json_str += "{\"name\":\"" + metadatum["name"] + "\", \"type\": \"" + metadatum["type"] + "\"}," + + json_str = json_str[:-1] + "]," + + # add correlationdata attributes if exists + if len(self.correlation_data) > 0: + json_str += "\"correlationData\":[" + for coredatum in self.correlation_data: + json_str += "{\"name\":\"" + coredatum["name"] + "\", \"type\": \"" + coredatum["type"] + "\"}," + + json_str = json_str[:-1] + "]," + + # add payloaddata attributes if exists + if len(self.payload_data) > 0: + json_str += "\"payloadData\":[" + for payloaddatum in self.payload_data: + json_str += "{\"name\":\"" + payloaddatum["name"] + "\", \"type\": \"" + payloaddatum["type"] + "\"}," + + json_str = json_str[:-1] + "]," + + json_str = json_str[:-1] + "}" + + return json_str + + +class ThriftEvent: + """ + Represents an event to be published to a BAM/CEP monitoring server + """ + def __init__(self): + self.metaData = [] + """:type : list[str]""" + self.correlationData = [] + """:type : list[str]""" + self.payloadData = [] + """:type : list[str]""" + + +class ThriftPublisher: + """ + Handles publishing events to BAM/CEP through thrift using the provided address and credentials + """ + log = LogFactory().get_log(__name__) + + def __init__(self, ip, port, username, password, stream_definition): + """ + Initializes a ThriftPublisher object. + + At initialization a ThriftPublisher connects and defines a stream definition. A connection + should be disconnected after all the publishing has been done. + + :param str ip: IP address of the monitoring server + :param str port: Port of the monitoring server + :param str username: Username + :param str password: Password + :param StreamDefinition stream_definition: StreamDefinition object for this particular connection + :return: ThriftPublisher object + :rtype: ThriftPublisher + """ + try: + port_number = int(port) + except ValueError: + raise RuntimeError("Port number for Thrift Publisher is invalid: %r" % port) + + self.__publisher = Publisher(ip, port_number) + self.__publisher.connect(username, password) + self.__publisher.defineStream(str(stream_definition)) + + self.stream_definition = stream_definition + self.stream_id = self.__publisher.streamId + self.ip = ip + self.port = port + self.username = username + self.password = password + + def publish(self, event): + """ + Publishes the given event by creating the event bundle from the log event + + :param ThriftEvent event: The log event to be published + :return: void + """ + event_bundler = EventBundle() + ThriftPublisher.assign_attributes(event.metaData, event_bundler) + ThriftPublisher.assign_attributes(event.correlationData, event_bundler) + ThriftPublisher.assign_attributes(event.payloadData, event_bundler) + + self.__publisher.publish(event_bundler) + self.log.debug("Published event to thrift stream [%r]" % self.stream_id) + + def disconnect(self): + """ + Disconnect the thrift publisher + :return: void + """ + self.__publisher.disconnect() + + @staticmethod + def assign_attributes(attributes, event_bundler): + """ + Adds the given attributes to the given event bundler according to type of each attribute + :param list attributes: attributes to be assigned + :param EventBundle event_bundler: Event bundle to assign attributes to + :return: void + """ + + # __intAttributeList = [] + # __longAttributeList = [] + # __doubleAttributeList = [] + # __boolAttributeList = [] + # __stringAttributeList = [] + + if attributes is not None and len(attributes) > 0: + for attrib in attributes: + if isinstance(attrib, int): + event_bundler.addIntAttribute(attrib) + elif isinstance(attrib, long): + event_bundler.addLongAttribute(attrib) + elif isinstance(attrib, float): + event_bundler.addDoubleAttribute(attrib) + elif isinstance(attrib, bool): + event_bundler.addBoolAttribute(attrib) + elif isinstance(attrib, str): + event_bundler.addStringAttribute(attrib) + else: + ThriftPublisher.log.error("Undefined attribute type: %r" % attrib) + else: + ThriftPublisher.log.debug("Empty attribute list") http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py new file mode 100644 index 0000000..adefd8e --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants'] http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py new file mode 100644 index 0000000..36943ba --- /dev/null +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/thrift/gen/Data/constants.py @@ -0,0 +1,8 @@ +# +# Autogenerated by Thrift Compiler (0.9.1) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# +
