Adding integration test to PCA to validate Git artifact distribution
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bbfa6c5b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bbfa6c5b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bbfa6c5b Branch: refs/heads/stratos-4.1.x Commit: bbfa6c5bb47ed2c644fe3be5d381682b7a683d5f Parents: ba20168 Author: Akila Perera <[email protected]> Authored: Fri Aug 28 02:09:45 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Fri Aug 28 02:09:45 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/entity.py | 5 +- .../extensions/bash/CreateLVSDummyInterface.sh | 26 + .../extensions/bash/MemberInitializedEvent.sh | 26 + .../cartridge.agent/healthstats.py | 1 + .../modules/artifactmgt/git/agentgithandler.py | 26 +- .../cartridge.agent/modules/databridge/agent.py | 11 +- .../modules/event/eventhandler.py | 31 +- .../cartridge.agent/publisher.py | 1 - .../python-cartridge-agent/integration/pom.xml | 35 +- .../python.cartridge.agent/test/ADCTest.java | 241 +++++++- .../test/PythonCartridgeAgentTest.java | 578 ------------------- .../test/PythonTestManager.java | 411 +++++++++++++ .../test/StartUpTest.java | 171 ++++++ .../src/test/resources/pca-testing1.xml | 2 +- .../src/test/resources/suite-1/agent.conf | 2 +- .../resources/suite-1/payload/launch-params | 2 +- .../src/test/resources/suite-2/agent.conf | 4 +- .../resources/suite-2/payload/launch-params | 19 +- 18 files changed, 959 insertions(+), 633 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py index 3bb03e7..d02f670 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py @@ -16,7 +16,7 @@ # under the License. import constants - +import json class Topology: """ @@ -413,6 +413,9 @@ class Member: for port in ports: self.add_port(port) + def to_json(self): + return "{memberId: " + self.member_id + ", status: " + self.status + "}" + class KubernetesService: """ http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/CreateLVSDummyInterface.sh ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/CreateLVSDummyInterface.sh b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/CreateLVSDummyInterface.sh new file mode 100644 index 0000000..0bdd6b0 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/CreateLVSDummyInterface.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# -------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# -------------------------------------------------------------- +# This extension script will be executed when a subscription domain added +# event is received by the cartridge agent. +# -------------------------------------------------------------- +# +echo `date`": Create LVS dummy interface shell extension executed" http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/MemberInitializedEvent.sh ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/MemberInitializedEvent.sh b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/MemberInitializedEvent.sh new file mode 100755 index 0000000..4fc3da6 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/extensions/bash/MemberInitializedEvent.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# -------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# -------------------------------------------------------------- +# This extension script will be executed when a subscription domain added +# event is received by the cartridge agent. +# -------------------------------------------------------------- +# +echo `date`": Member initialized event shell extension executed" http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index be45294..dddee70 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -124,6 +124,7 @@ class HealthStatisticsPublisher: cep_admin_username, cep_admin_password, self.stream_definition) + self.publisher.start() HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py index bc82b35..22fe816 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py @@ -327,17 +327,20 @@ class AgentGitHandler: # This way, commit and push becomes an single operation. No intermediate state will be left behind. (init_head, init_errors) = AgentGitHandler.execute_git_command(["rev-parse", "HEAD"], git_repo.local_repo_path) - # check if modified - modified, unstaged_files = AgentGitHandler.get_unstaged_files(git_repo.local_repo_path) + # stage all untracked files + if AgentGitHandler.stage_all(git_repo.local_repo_path): + AgentGitHandler.log.info("Git staged untracked artifacts successfully") + else: + AgentGitHandler.log.info("Git could not stage untracked artifacts") - AgentGitHandler.log.debug("[Git] Modified: %s" % str(modified)) + # check if modified files are present + modified = AgentGitHandler.has_modified_files(git_repo.local_repo_path) + AgentGitHandler.log.debug("[Git] Modified: %s" % str(modified)) if not modified: AgentGitHandler.log.debug("No changes detected in the local repository for tenant %s" % git_repo.tenant_id) return - AgentGitHandler.stage_all(git_repo.local_repo_path) - # commit to local repositpory commit_message = "tenant [%s]'s artifacts committed to local repo at %s" \ % (git_repo.tenant_id, git_repo.local_repo_path) @@ -385,17 +388,14 @@ class AgentGitHandler: "Pushing artifacts to remote repository failed for tenant %s: %s" % (git_repo.tenant_id, e)) @staticmethod - def get_unstaged_files(repo_path): - + def has_modified_files(repo_path): (output, errors) = AgentGitHandler.execute_git_command(["status"], repo_path=repo_path) - unstaged_files = {"modified": [], "untracked": []} + AgentGitHandler.log.debug("Git status output: %s", str(output)) if "nothing to commit" in output: - return False, unstaged_files - - if "Changes not staged for commit" in output: - # there are modified files - return True, unstaged_files + return False + else: + return True @staticmethod def stage_all(repo_path): http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/agent.py index 5ef794c..a17a589 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/databridge/agent.py @@ -16,6 +16,7 @@ # under the License. from thrift.publisher import * +from threading import Thread from thrift.gen.Exception.ttypes import ThriftSessionExpiredException from ..util.log import * from exception import ThriftReceiverOfflineException @@ -114,7 +115,7 @@ class ThriftEvent: """:type : list[T]""" -class ThriftPublisher: +class ThriftPublisher(Thread): """ Handles publishing events to BAM/CEP through thrift using the provided address and credentials """ @@ -135,15 +136,14 @@ class ThriftPublisher: :return: ThriftPublisher object :rtype: ThriftPublisher """ + Thread.__init__(self) try: port_number = int(port) except ValueError: raise RuntimeError("Port number for Thrift Publisher is invalid: %r" % port) self.__publisher = Publisher(ip, port_number) - self.__publisher.connect(username, password) - self.__publisher.defineStream(str(stream_definition)) - + #self.__publisher.defineStream(str(stream_definition)) self.stream_definition = stream_definition self.stream_id = self.__publisher.streamId self.ip = ip @@ -151,6 +151,9 @@ class ThriftPublisher: self.username = username self.password = password + def run(self): + self.__publisher.connect(self.username, self.password) + def publish(self, event): """ Publishes the given event by creating the event bundle from the log event http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index 9fe6ae9..9496951 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -91,6 +91,7 @@ class EventHandler: # checkout code subscribe_run, updated = AgentGitHandler.checkout(repo_info) + # execute artifact updated extension plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id, "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id, @@ -99,7 +100,10 @@ class EventHandler: "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username, "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status} - self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values) + try: + self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values) + except ValueError: + self.__log.exception("Could not execute plugins for artifact updated event.") if subscribe_run: # publish instanceActivated @@ -109,10 +113,11 @@ class EventHandler: self.on_artifact_update_scheduler_event(tenant_id) update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, False) + auto_commit = Config.is_commits_enabled + auto_checkout = Config.is_checkout_enabled + self.__log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s", + update_artifacts, auto_commit, auto_checkout) if update_artifacts: - auto_commit = Config.is_commits_enabled - auto_checkout = Config.is_checkout_enabled - try: update_interval = int(Config.artifact_update_interval) except ValueError: @@ -207,6 +212,7 @@ class EventHandler: if member_exists: Config.initialized = True + self.markMemberAsInitialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {}) @@ -276,7 +282,6 @@ class EventHandler: service_name_in_payload = Config.service_name cluster_id_in_payload = Config.cluster_id member_id_in_payload = Config.member_id - member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -476,6 +481,7 @@ class EventHandler: service = topology.get_service(service_name) cluster = service.get_cluster(cluster_id) found_member = cluster.get_member(member_id) + self.__log.debug("Found member: " + found_member.to_json()) if found_member.status == MemberStatus.Initialized: return True @@ -500,6 +506,21 @@ class EventHandler: return True + def markMemberAsInitialized(self, service_name, cluster_id, member_id): + topology = TopologyContext.get_topology() + service = topology.get_service(service_name) + if service is None: + self.__log.error("Service not found in topology [service] %s" % service_name) + return False + + cluster = service.get_cluster(cluster_id) + if cluster is None: + self.__log.error("Cluster id not found in topology [cluster] %s" % cluster_id) + return False + + member = cluster.get_member(member_id) + member.status = MemberStatus.Initialized + @staticmethod def add_common_input_values(plugin_values): """ http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index 716df2d..d4365e8 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -116,7 +116,6 @@ def publish_instance_activated_event(health_stat_plugin): interval = interval_default else: interval = interval_default - health_stats_publisher = healthstats.HealthStatisticsPublisherManager(interval, health_stat_plugin) log.info("Starting Health statistics publisher with interval %r" % interval) health_stats_publisher.start() http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/pom.xml ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/pom.xml b/products/python-cartridge-agent/integration/pom.xml index e92bbb4..a359615 100755 --- a/products/python-cartridge-agent/integration/pom.xml +++ b/products/python-cartridge-agent/integration/pom.xml @@ -72,13 +72,34 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <inherited>false</inherited> - <configuration> - <suiteXmlFiles> - <suiteXmlFile>src/test/resources/pca-testing1.xml</suiteXmlFile> - </suiteXmlFiles> - <workingDirectory>${basedir}/target</workingDirectory> - </configuration> + <executions> + <execution> + <id>default-test</id> + <goals> + <goal>test</goal> + </goals> + <inherited>false</inherited> + <configuration> + <suiteXmlFiles> + <suiteXmlFile>src/test/resources/pca-testing1.xml</suiteXmlFile> + </suiteXmlFiles> + <workingDirectory>${basedir}/target</workingDirectory> + </configuration> + </execution> + <execution> + <id>adc-test</id> + <goals> + <goal>test</goal> + </goals> + <inherited>false</inherited> + <configuration> + <suiteXmlFiles> + <suiteXmlFile>src/test/resources/pca-testing2.xml</suiteXmlFile> + </suiteXmlFiles> + <workingDirectory>${basedir}/target</workingDirectory> + </configuration> + </execution> + </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java index 86fc7c3..e657ecc 100755 --- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java +++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/ADCTest.java @@ -17,5 +17,244 @@ package org.apache.stratos.python.cartridge.agent.test;/* * under the License. */ -public class ADCTest { +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.domain.LoadBalancingIPType; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static junit.framework.Assert.assertTrue; + +public class ADCTest extends PythonTestManager { + private static final Log log = LogFactory.getLog(ADCTest.class); + private static final int ADC_TIMEOUT = 180000; + private static final String RESOURCES_PATH = "/suite-2"; + private static final String APPLICATION_PATH = "/tmp/pca-test-suite-2"; + private static final String CLUSTER_ID = "tomcat.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-2"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-2"; + private static final String APP_ID = "application-2"; + private static final String MEMBER_ID = "tomcat.member-1"; + private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; + private static final String NETWORK_PARTITION_ID = "network-partition-1"; + private static final String PARTITION_ID = "partition-1"; + private static final String TENANT_ID = "-1234"; + private static final String SERVICE_NAME = "tomcat"; + private static final String SOURCE_PATH = "/tmp/stratos-pca-adc-test-app-path/"; + + private static boolean hasADCTestCompleted = false; + + @BeforeSuite + public void setupStartUpTest() { + // Set jndi.properties.dir system property for initializing event publishers and receivers + System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH)); + + // start Python agent with configurations provided in resource path + setup(RESOURCES_PATH); + + // Simulate server socket + startServerSocket(8080); + } + + /** + * TearDown method for test method testPythonCartridgeAgent + */ + @AfterSuite + public void tearDownStartUpTest() { + // TODO: app path is duplicated in Java test and payload + tearDown(APPLICATION_PATH); + } + + + @Test(timeOut = ADC_TIMEOUT) + public void testPythonCartridgeAgent() { + Thread communicatorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!eventReceiverInitiated) { + sleep(1000); + } + List<String> outputLines = new ArrayList<String>(); + while (!outputStream.isClosed()) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Exception in thread") || line.contains("ERROR")) { + try { + throw new RuntimeException(line); + } + catch (Exception e) { + log.error("ERROR found in PCA log", e); + } + } + if (line.contains("Subscribed to 'topology/#'")) { + sleep(2000); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + } + + // Send artifact updated event to activate the instance first + if (line.contains("Artifact repository found")) { + publishEvent(getArtifactUpdatedEventForPrivateRepo()); + log.info("Artifact updated event published"); + } + log.info(line); + } + } + sleep(100); + } + } + }); + communicatorThread.start(); + + while (!instanceActivated) { + // wait until the instance activated event is received. + sleep(1000); + } + assertTrue("Instance started event was not received", instanceStarted); + assertTrue("Instance activated event was not received", instanceActivated); + + Thread adcTestThread = new Thread(new Runnable() { + @Override + public void run() { + log.info("Running ADC Test thread..."); + // Send artifact updated event + publishEvent(getArtifactUpdatedEventForPrivateRepo()); + log.info("Publishing artifact updated event for repo: " + + getArtifactUpdatedEventForPrivateRepo().getRepoURL()); + + List<String> outputLines = new ArrayList<String>(); + while (!outputStream.isClosed() && !hasADCTestCompleted) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Git clone executed")) { + log.info("Agent has completed git clone. Asserting the operation..."); + assertRepoClone(getArtifactUpdatedEventForPrivateRepo()); + File file = new File(APPLICATION_PATH + "/pca-live-" + UUID.randomUUID()); + try { + file.createNewFile(); + } + catch (IOException e) { + log.error("Could not create file", e); + } + } + if (line.contains("Pushed artifacts for tenant")) { + // TODO: Get rid of static var + log.info("ADC Test completed"); + hasADCTestCompleted = true; + } + } + } + sleep(100); + } + } + }); + adcTestThread.start(); + + while (!hasADCTestCompleted) { + // wait until the instance activated event is received. + sleep(1000); + } + assertTrue("ADC Test failed", hasADCTestCompleted); + } + + private void assertRepoClone(ArtifactUpdatedEvent artifactUpdatedEvent) { + File file = new File(APPLICATION_PATH + "/README.text"); + assertTrue("Git clone failed for repo [url] " + artifactUpdatedEvent.getRepoURL(), + file.exists()); + } + + private void assertRepoPush(ArtifactUpdatedEvent artifactUpdatedEvent) { + File file = new File(APPLICATION_PATH + "/test1.txt"); + assertTrue("Git clone failed for repo [url] " + artifactUpdatedEvent.getRepoURL(), file.exists()); + + } + + /** + * This method returns a collection of {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} + * objects as parameters to the test + * + * @return + */ + public static ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() { + ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); + publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); + return publicRepoEvent; + } + + public static ArtifactUpdatedEvent getArtifactUpdatedEventForPrivateRepo() { + ArtifactUpdatedEvent privateRepoEvent = createTestArtifactUpdatedEvent(); + privateRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/testrepo.git"); + privateRepoEvent.setRepoUserName("testapache2211"); + privateRepoEvent.setRepoPassword("RExPDGa4GkPJj4kJDzSROQ=="); + return privateRepoEvent; + } + + /** + * Creates an {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} object with a public + * repository URL + * + * @return + */ + private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { + ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent(); + artifactUpdatedEvent.setClusterId(CLUSTER_ID); + artifactUpdatedEvent.setTenantId(TENANT_ID); + return artifactUpdatedEvent; + } + + /** + * Create test topology + * + * @return + */ + private Topology createTestTopology() { + Topology topology = new Topology(); + Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); + topology.addService(service); + + Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, + AUTOSCALING_POLICY_NAME, APP_ID); + service.addCluster(cluster); + + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, + CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, + System.currentTimeMillis()); + + member.setDefaultPrivateIP("10.0.0.1"); + member.setDefaultPublicIP("20.0.0.1"); + Properties properties = new Properties(); + properties.setProperty("prop1", "value1"); + member.setProperties(properties); + member.setStatus(MemberStatus.Created); + cluster.addMember(member); + + return topology; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java deleted file mode 100755 index 93f4c1e..0000000 --- a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java +++ /dev/null @@ -1,578 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.python.cartridge.agent.test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.commons.exec.*; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.domain.LoadBalancingIPType; -import org.apache.stratos.common.threading.StratosThreadPool; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; -import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; -import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; -import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener; -import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; -import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; -import org.apache.stratos.messaging.util.MessagingUtil; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.Test; - -import java.io.*; -import java.net.ServerSocket; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import static junit.framework.Assert.assertTrue; - -public class PythonCartridgeAgentTest { - - private static final Log log = LogFactory.getLog(PythonCartridgeAgentTest.class); - private static final String RESOURCES_PATH = "/suite-1"; - private static final String NEW_LINE = System.getProperty("line.separator"); - // private static final long TIMEOUT = 1440000; - private static final long TIMEOUT = 120000; - private static final String CLUSTER_ID = "php.php.domain"; - private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1"; - private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1"; - private static final String APP_ID = "application-1"; - private static final String MEMBER_ID = "php.member-1"; - private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; - private static final String NETWORK_PARTITION_ID = "network-partition-1"; - private static final String PARTITION_ID = "partition-1"; - private static final String TENANT_ID = "-1234"; - private static final String SERVICE_NAME = "php"; - public static final String SOURCE_PATH = "/tmp/stratos-pca-test-app-path/"; - - private static List<ServerSocket> serverSocketList; - private static Map<String, Executor> executorList; - - private boolean instanceStarted; - private boolean instanceActivated; - private ByteArrayOutputStreamLocal outputStream; - private boolean eventReceiverInitiated = false; - private TopologyEventReceiver topologyEventReceiver; - private InstanceStatusEventReceiver instanceStatusEventReceiver; - private BrokerService broker = new BrokerService(); - private static final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); - - private static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "activemq.amqp.bind.address"; - private static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "activemq.mqtt.bind.address"; - private static final String CEP_PORT = "cep.port"; - private static final String DISTRIBUTION_NAME = "distribution.name"; - - private int cepPort; - private String amqpBindAddress; - private String mqttBindAddress; - private String distributionName; - private Properties integrationProperties; - - public PythonCartridgeAgentTest() { - if (integrationProperties == null) { - integrationProperties = new Properties(); - try { - integrationProperties - .load(PythonCartridgeAgentTest.class.getResourceAsStream("/integration-test.properties")); - distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME); - amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS); - mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS); - cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT)); - log.info("PCA integration properties: " + integrationProperties.toString()); - } - catch (IOException e) { - log.error("Error loading integration-test.properties file from classpath. Please make sure that file " + - "exists in classpath.", e); - } - } - } - - /** - * Setup method for test class - */ - @BeforeSuite - public static void oneTimeSetUp() { - // Set jndi.properties.dir system property for initializing event publishers and receivers - System.setProperty("jndi.properties.dir", getResourcesPath()); - } - - /** - * Setup method for test method testPythonCartridgeAgent - */ - @BeforeSuite - public void setup() { - serverSocketList = new ArrayList<ServerSocket>(); - executorList = new HashMap<String, Executor>(); - try { - broker.addConnector(amqpBindAddress); - broker.addConnector(mqttBindAddress); - broker.setBrokerName("testBroker"); - broker.setDataDirectory( - PythonCartridgeAgentTest.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME + - "/activemq-data"); - broker.start(); - log.info("Broker service started!"); - } - catch (Exception e) { - log.error("Error while setting up broker service", e); - } - if (!this.eventReceiverInitiated) { - ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 15); - topologyEventReceiver = new TopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); - - instanceStatusEventReceiver = new InstanceStatusEventReceiver(); - instanceStatusEventReceiver.setExecutorService(executorService); - instanceStatusEventReceiver.execute(); - - this.instanceStarted = false; - instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("Instance started event received"); - instanceStarted = true; - } - }); - - this.instanceActivated = false; - instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("Instance activated event received"); - instanceActivated = true; - } - }); - - this.eventReceiverInitiated = true; - } - // Simulate CEP server socket - startServerSocket(cepPort); - String agentPath = setupPythonAgent(); - log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME); - log.info("Starting python cartridge agent..."); - this.outputStream = executeCommand("python " + agentPath + "/agent.py > " + - PythonCartridgeAgentTest.class.getResource(File.separator).getPath() + "/../" + PYTHON_AGENT_DIR_NAME + - "/cartridge-agent.log"); - } - - /** - * TearDown method for test method testPythonCartridgeAgent - */ - @AfterSuite - public void tearDown() { - for (Map.Entry<String, Executor> entry : executorList.entrySet()) { - try { - String commandText = entry.getKey(); - Executor executor = entry.getValue(); - ExecuteWatchdog watchdog = executor.getWatchdog(); - if (watchdog != null) { - log.info("Terminating process: " + commandText); - watchdog.destroyProcess(); - } - } - catch (Exception ignore) { - } - } - for (ServerSocket serverSocket : serverSocketList) { - try { - log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress()); - serverSocket.close(); - } - catch (IOException ignore) { - } - } - - try { - log.info("Deleting source checkout folder..."); - FileUtils.deleteDirectory(new File(SOURCE_PATH)); - } - catch (Exception ignore) { - - } - - this.instanceStatusEventReceiver.terminate(); - this.topologyEventReceiver.terminate(); - - this.instanceActivated = false; - this.instanceStarted = false; - try { - broker.stop(); - } - catch (Exception e) { - log.error("Error while stopping the broker service", e); - } - } - - - /** - * This method returns a collection of {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} - * objects as parameters to the test - * - * @return - */ - - public static ArrayList<ArtifactUpdatedEvent> getArtifactUpdatedEventsAsParams() { - ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); - - ArtifactUpdatedEvent privateRepoEvent = createTestArtifactUpdatedEvent(); - privateRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/testrepo.git"); - privateRepoEvent.setRepoUserName("testapache2211"); - privateRepoEvent.setRepoPassword("RExPDGa4GkPJj4kJDzSROQ=="); - - ArtifactUpdatedEvent privateRepoEvent2 = createTestArtifactUpdatedEvent(); - privateRepoEvent2.setRepoURL("https://[email protected]/testapache2211/testrepo.git"); - privateRepoEvent2.setRepoUserName("testapache2211"); - privateRepoEvent2.setRepoPassword("iF7qT+BKKPE3PGV1TeDsJA=="); - - ArrayList<ArtifactUpdatedEvent> list = new ArrayList<ArtifactUpdatedEvent>(); - list.add(privateRepoEvent); - list.add(privateRepoEvent2); - list.add(publicRepoEvent); - return list; - } - - /** - * Creates an {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} object with a public - * repository URL - * - * @return - */ - private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { - ArtifactUpdatedEvent publicRepoEvent = new ArtifactUpdatedEvent(); - publicRepoEvent.setClusterId(CLUSTER_ID); - publicRepoEvent.setTenantId(TENANT_ID); - publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); - publicRepoEvent.setCommitEnabled(true); - return publicRepoEvent; - } - - @Test(timeOut = TIMEOUT) - public void testPythonCartridgeAgent() { - Thread communicatorThread = new Thread(new Runnable() { - @Override - public void run() { - List<String> outputLines = new ArrayList<String>(); - while (!outputStream.isClosed()) { - List<String> newLines = getNewLines(outputLines, outputStream.toString()); - if (newLines.size() > 0) { - for (String line : newLines) { - if (line.contains("Subscribed to 'topology/#'")) { - sleep(1000); - // Send complete topology event - log.info("Publishing complete topology event..."); - Topology topology = createTestTopology(); - CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); - publishEvent(completeTopologyEvent); - log.info("Complete topology event published"); - - sleep(3000); - // Publish member initialized event - log.info("Publishing member initialized event..."); - MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( - SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, - PARTITION_ID - ); - publishEvent(memberInitializedEvent); - log.info("Member initialized event published"); - - // Simulate server socket - startServerSocket(8080); - } - if (line.contains("Artifact repository found")) { - // Send artifact updated event - ArrayList<ArtifactUpdatedEvent> list = getArtifactUpdatedEventsAsParams(); - for (ArtifactUpdatedEvent artifactUpdatedEvent : list) { - publishEvent(artifactUpdatedEvent); - } - } - - if (line.contains("Exception in thread") || line.contains("ERROR")) { - //throw new RuntimeException(line); - } - log.info(line); - } - } - sleep(100); - } - } - }); - - communicatorThread.start(); - - while (!instanceActivated) { - // wait until the instance activated event is received. - sleep(2000); - } - - assertTrue("Instance started event was not received", instanceStarted); - assertTrue("Instance activated event was not received", instanceActivated); - } - - /** - * Publish messaging event - * - * @param event - */ - private void publishEvent(Event event) { - String topicName = MessagingUtil.getMessageTopicName(event); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(topicName); - eventPublisher.publish(event); - } - - /** - * Start server socket - * - * @param port - */ - private void startServerSocket(final int port) { - Thread socketThread = new Thread(new Runnable() { - @Override - public void run() { - try { - ServerSocket serverSocket = new ServerSocket(port); - serverSocketList.add(serverSocket); - log.info("Server socket started on port: " + port); - serverSocket.accept(); - } - catch (IOException e) { - String message = "Could not start server socket: [port] " + port; - log.error(message, e); - throw new RuntimeException(message, e); - } - } - }); - socketThread.start(); - } - - /** - * Create test topology - * - * @return - */ - private Topology createTestTopology() { - Topology topology = new Topology(); - Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); - topology.addService(service); - - Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, - AUTOSCALING_POLICY_NAME, APP_ID); - service.addCluster(cluster); - - Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, - CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, - System.currentTimeMillis()); - - member.setDefaultPrivateIP("10.0.0.1"); - member.setDefaultPublicIP("20.0.0.1"); - Properties properties = new Properties(); - properties.setProperty("prop1", "value1"); - member.setProperties(properties); - member.setStatus(MemberStatus.Created); - cluster.addMember(member); - - return topology; - } - - /** - * Return new lines found in the output - * - * @param currentOutputLines current output lines - * @param output output - * @return - */ - private List<String> getNewLines(List<String> currentOutputLines, String output) { - List<String> newLines = new ArrayList<String>(); - - if (StringUtils.isNotBlank(output)) { - String[] lines = output.split(NEW_LINE); - for (String line : lines) { - if (!currentOutputLines.contains(line)) { - currentOutputLines.add(line); - newLines.add(line); - } - } - } - return newLines; - } - - public static String getResourcesPath() { - return PythonCartridgeAgentTest.class.getResource("/").getPath() + "/../../src/test/resources" + RESOURCES_PATH; - } - - /** - * Sleep current thread - * - * @param time - */ - private void sleep(long time) { - try { - Thread.sleep(time); - } - catch (InterruptedException ignore) { - } - } - - /** - * Copy python agent distribution to a new folder, extract it and copy sample configuration files - * - * @return - */ - private String setupPythonAgent() { - try { - log.info("Setting up python cartridge agent..."); - - - String srcAgentPath = PythonCartridgeAgentTest.class.getResource("/").getPath() + - "/../../../distribution/target/" + distributionName + ".zip"; - String unzipDestPath = - PythonCartridgeAgentTest.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME + "/"; - //FileUtils.copyFile(new File(srcAgentPath), new File(destAgentPath)); - unzip(srcAgentPath, unzipDestPath); - String destAgentPath = PythonCartridgeAgentTest.class.getResource("/").getPath() + "/../" + - PYTHON_AGENT_DIR_NAME + "/" + distributionName; - - String srcAgentConfPath = getResourcesPath() + "/agent.conf"; - String destAgentConfPath = destAgentPath + "/agent.conf"; - FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath)); - - String srcLoggingIniPath = getResourcesPath() + "/logging.ini"; - String destLoggingIniPath = destAgentPath + "/logging.ini"; - FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath)); - - String srcPayloadPath = getResourcesPath() + "/payload"; - String destPayloadPath = destAgentPath + "/payload"; - FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath)); - - log.info("Changing extension scripts permissions"); - File extensionsPath = new File(destAgentPath + "/extensions/bash"); - File[] extensions = extensionsPath.listFiles(); - for (File extension : extensions) { - extension.setExecutable(true); - } - - log.info("Python cartridge agent setup completed"); - - return destAgentPath; - } - catch (Exception e) { - String message = "Could not copy cartridge agent distribution"; - log.error(message, e); - throw new RuntimeException(message, e); - } - } - - public void unzip(String zipFilePath, String destDirectory) throws IOException { - File destDir = new File(destDirectory); - if (!destDir.exists()) { - destDir.mkdir(); - } - ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath)); - ZipEntry entry = zipIn.getNextEntry(); - // iterates over entries in the zip file - while (entry != null) { - String filePath = destDirectory + File.separator + entry.getName(); - if (!entry.isDirectory()) { - // if the entry is a file, extracts it - extractFile(zipIn, filePath); - } else { - // if the entry is a directory, make the directory - File dir = new File(filePath); - dir.mkdir(); - } - zipIn.closeEntry(); - entry = zipIn.getNextEntry(); - } - zipIn.close(); - } - - private void extractFile(ZipInputStream zipIn, String filePath) throws IOException { - BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath)); - byte[] bytesIn = new byte[4096]; - int read = 0; - while ((read = zipIn.read(bytesIn)) != -1) { - bos.write(bytesIn, 0, read); - } - bos.close(); - } - - /** - * Execute shell command - * - * @param commandText - */ - private ByteArrayOutputStreamLocal executeCommand(final String commandText) { - final ByteArrayOutputStreamLocal outputStream = new ByteArrayOutputStreamLocal(); - try { - CommandLine commandline = CommandLine.parse(commandText); - DefaultExecutor exec = new DefaultExecutor(); - PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); - exec.setWorkingDirectory(new File( - PythonCartridgeAgentTest.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME)); - exec.setStreamHandler(streamHandler); - ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT); - exec.setWatchdog(watchdog); - exec.execute(commandline, new ExecuteResultHandler() { - @Override - public void onProcessComplete(int i) { - log.info(commandText + " process completed"); - } - - @Override - public void onProcessFailed(ExecuteException e) { - log.error(commandText + " process failed", e); - } - }); - executorList.put(commandText, exec); - return outputStream; - } - catch (Exception e) { - log.error(outputStream.toString(), e); - throw new RuntimeException(e); - } - } - - /** - * Implements ByteArrayOutputStream.isClosed() method - */ - private class ByteArrayOutputStreamLocal extends ByteArrayOutputStream { - private boolean closed; - - @Override - public void close() throws IOException { - super.close(); - closed = true; - } - - public boolean isClosed() { - return closed; - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonTestManager.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonTestManager.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonTestManager.java new file mode 100644 index 0000000..68cca0c --- /dev/null +++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonTestManager.java @@ -0,0 +1,411 @@ +package org.apache.stratos.python.cartridge.agent.test;/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.exec.*; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener; +import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; +import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.util.MessagingUtil; + +import java.io.*; +import java.net.ServerSocket; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class PythonTestManager { + protected final Properties integrationProperties = new Properties(); + private static final Log log = LogFactory.getLog(PythonTestManager.class); + protected BrokerService broker = new BrokerService(); + + public final long TIMEOUT = 180000; + public static final String NEW_LINE = System.getProperty("line.separator"); + public static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "activemq.amqp.bind.address"; + public static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "activemq.mqtt.bind.address"; + public static final String CEP_PORT = "cep.port"; + public static final String DISTRIBUTION_NAME = "distribution.name"; + protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); + + protected List<ServerSocket> serverSocketList = new ArrayList<ServerSocket>(); + protected Map<String, Executor> executorList = new HashMap<String, Executor>(); + + protected int cepPort; + protected String amqpBindAddress; + protected String mqttBindAddress; + protected String distributionName; + + protected boolean eventReceiverInitiated = false; + protected TopologyEventReceiver topologyEventReceiver; + protected InstanceStatusEventReceiver instanceStatusEventReceiver; + protected boolean instanceStarted; + protected boolean instanceActivated; + protected ByteArrayOutputStreamLocal outputStream; + + /** + * Setup method for test method testPythonCartridgeAgent + */ + protected void setup(String resourcePath) { + try { + startBroker(); + } + catch (Exception e) { + log.error("Error while starting MB", e); + return; + } + if (!this.eventReceiverInitiated) { + ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 15); + topologyEventReceiver = new TopologyEventReceiver(); + topologyEventReceiver.setExecutorService(executorService); + topologyEventReceiver.execute(); + + instanceStatusEventReceiver = new InstanceStatusEventReceiver(); + instanceStatusEventReceiver.setExecutorService(executorService); + instanceStatusEventReceiver.execute(); + + this.instanceStarted = false; + instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance started event received"); + instanceStarted = true; + } + }); + + this.instanceActivated = false; + instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance activated event received"); + instanceActivated = true; + } + }); + + this.eventReceiverInitiated = true; + } + // Simulate CEP server socket + startServerSocket(cepPort); + String agentPath = setupPythonAgent(resourcePath); + log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME); + log.info("Starting python cartridge agent..."); + this.outputStream = executeCommand("python " + agentPath + "/agent.py > " + + PythonTestManager.class.getResource(File.separator).getPath() + "/../" + PYTHON_AGENT_DIR_NAME + + "/cartridge-agent-console.log"); + } + + protected void tearDown() { + tearDown(null); + } + + /** + * TearDown method for test method testPythonCartridgeAgent + */ + protected void tearDown(String sourcePath) { + for (Map.Entry<String, Executor> entry : executorList.entrySet()) { + try { + String commandText = entry.getKey(); + Executor executor = entry.getValue(); + log.info("Terminating process: " + commandText); + executor.setExitValue(0); + executor.getWatchdog().destroyProcess(); + } + catch (Exception ignore) { + } + } + for (ServerSocket serverSocket : serverSocketList) { + try { + log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress()); + serverSocket.close(); + } + catch (IOException ignore) { + } + } + + try { + log.info("Deleting source checkout folder..."); + FileUtils.deleteDirectory(new File(sourcePath)); + } + catch (Exception ignore) { + } + + this.instanceStatusEventReceiver.terminate(); + this.topologyEventReceiver.terminate(); + + this.instanceActivated = false; + this.instanceStarted = false; + // wait until everything cleans up to avoid connection errors + sleep(1000); + try { + broker.stop(); + } + catch (Exception e) { + log.error("Error while stopping the broker service", e); + } + } + + public PythonTestManager() { + try { + integrationProperties + .load(PythonTestManager.class.getResourceAsStream("/integration-test.properties")); + distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME); + amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS); + mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS); + cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT)); + log.info("PCA integration properties: " + integrationProperties.toString()); + } + catch (IOException e) { + log.error("Error loading integration-test.properties file from classpath. Please make sure that file " + + "exists in classpath.", e); + } + } + + protected void startBroker() throws Exception { + broker.addConnector(amqpBindAddress); + broker.addConnector(mqttBindAddress); + broker.setBrokerName("testBroker"); + broker.setDataDirectory( + PythonTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME + + "/activemq-data"); + broker.start(); + log.info("Broker service started!"); + } + + /** + * Start server socket + * + * @param port + */ + protected void startServerSocket(final int port) { + Thread socketThread = new Thread(new Runnable() { + @Override + public void run() { + try { + ServerSocket serverSocket = new ServerSocket(port); + serverSocketList.add(serverSocket); + log.info("Server socket started on port: " + port); + serverSocket.accept(); + } + catch (IOException e) { + String message = "Could not start server socket: [port] " + port; + log.error(message, e); + throw new RuntimeException(message, e); + } + } + }); + socketThread.start(); + } + + + protected static String getResourcesPath(String resourcesPath) { + return PythonTestManager.class.getResource("/").getPath() + "/../../src/test/resources" + resourcesPath; + } + + /** + * Copy python agent distribution to a new folder, extract it and copy sample configuration files + * + * @return + */ + protected String setupPythonAgent(String resourcesPath) { + try { + log.info("Setting up python cartridge agent..."); + + + String srcAgentPath = PythonTestManager.class.getResource("/").getPath() + + "/../../../distribution/target/" + distributionName + ".zip"; + String unzipDestPath = + PythonTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME + "/"; + //FileUtils.copyFile(new File(srcAgentPath), new File(destAgentPath)); + unzip(srcAgentPath, unzipDestPath); + String destAgentPath = PythonTestManager.class.getResource("/").getPath() + "/../" + + PYTHON_AGENT_DIR_NAME + "/" + distributionName; + + String srcAgentConfPath = getResourcesPath(resourcesPath) + "/agent.conf"; + String destAgentConfPath = destAgentPath + "/agent.conf"; + FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath)); + + String srcLoggingIniPath = getResourcesPath(resourcesPath) + "/logging.ini"; + String destLoggingIniPath = destAgentPath + "/logging.ini"; + FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath)); + + String srcPayloadPath = getResourcesPath(resourcesPath) + "/payload"; + String destPayloadPath = destAgentPath + "/payload"; + FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath)); + + log.info("Changing extension scripts permissions"); + File extensionsPath = new File(destAgentPath + "/extensions/bash"); + File[] extensions = extensionsPath.listFiles(); + for (File extension : extensions) { + extension.setExecutable(true); + } + + log.info("Python cartridge agent setup completed"); + + return destAgentPath; + } + catch (Exception e) { + String message = "Could not copy cartridge agent distribution"; + log.error(message, e); + throw new RuntimeException(message, e); + } + } + + private void unzip(String zipFilePath, String destDirectory) throws IOException { + File destDir = new File(destDirectory); + if (!destDir.exists()) { + destDir.mkdir(); + } + ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath)); + ZipEntry entry = zipIn.getNextEntry(); + // iterates over entries in the zip file + while (entry != null) { + String filePath = destDirectory + File.separator + entry.getName(); + if (!entry.isDirectory()) { + // if the entry is a file, extracts it + extractFile(zipIn, filePath); + } else { + // if the entry is a directory, make the directory + File dir = new File(filePath); + dir.mkdir(); + } + zipIn.closeEntry(); + entry = zipIn.getNextEntry(); + } + zipIn.close(); + } + + private void extractFile(ZipInputStream zipIn, String filePath) throws IOException { + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath)); + byte[] bytesIn = new byte[4096]; + int read = 0; + while ((read = zipIn.read(bytesIn)) != -1) { + bos.write(bytesIn, 0, read); + } + bos.close(); + } + + /** + * Execute shell command + * + * @param commandText + */ + protected ByteArrayOutputStreamLocal executeCommand(final String commandText) { + final ByteArrayOutputStreamLocal outputStream = new ByteArrayOutputStreamLocal(); + try { + CommandLine commandline = CommandLine.parse(commandText); + DefaultExecutor exec = new DefaultExecutor(); + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); + exec.setWorkingDirectory(new File( + PythonTestManager.class.getResource("/").getPath() + "/../" + PYTHON_AGENT_DIR_NAME)); + exec.setStreamHandler(streamHandler); + ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT); + exec.setWatchdog(watchdog); + exec.execute(commandline, new ExecuteResultHandler() { + @Override + public void onProcessComplete(int i) { + log.info(commandText + " process completed"); + } + + @Override + public void onProcessFailed(ExecuteException e) { + log.error(commandText + " process failed", e); + } + }); + executorList.put(commandText, exec); + return outputStream; + } + catch (Exception e) { + log.error(outputStream.toString(), e); + throw new RuntimeException(e); + } + } + + /** + * Sleep current thread + * + * @param time + */ + protected void sleep(long time) { + try { + Thread.sleep(time); + } + catch (InterruptedException ignore) { + } + } + + /** + * Return new lines found in the output + * + * @param currentOutputLines current output lines + * @param output output + * @return + */ + protected List<String> getNewLines(List<String> currentOutputLines, String output) { + List<String> newLines = new ArrayList<String>(); + + if (StringUtils.isNotBlank(output)) { + String[] lines = output.split(NEW_LINE); + for (String line : lines) { + if (!currentOutputLines.contains(line)) { + currentOutputLines.add(line); + newLines.add(line); + } + } + } + return newLines; + } + + /** + * Publish messaging event + * + * @param event + */ + protected void publishEvent(Event event) { + String topicName = MessagingUtil.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topicName); + eventPublisher.publish(event); + } + + + /** + * Implements ByteArrayOutputStream.isClosed() method + */ + protected class ByteArrayOutputStreamLocal extends org.apache.commons.io.output.ByteArrayOutputStream { + private boolean closed; + + @Override + public void close() throws IOException { + super.close(); + closed = true; + } + + public boolean isClosed() { + return closed; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/StartUpTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/StartUpTest.java b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/StartUpTest.java new file mode 100755 index 0000000..36a8cd5 --- /dev/null +++ b/products/python-cartridge-agent/integration/src/test/java/org/apache/stratos/python.cartridge.agent/test/StartUpTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.python.cartridge.agent.test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.domain.LoadBalancingIPType; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static junit.framework.Assert.assertTrue; + +public class StartUpTest extends PythonTestManager { + private static final Log log = LogFactory.getLog(StartUpTest.class); + private static final int STARTUP_TIMEOUT = 30000; + private static final String RESOURCES_PATH = "/suite-1"; + private static final String CLUSTER_ID = "php.php.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1"; + private static final String APP_ID = "application-1"; + private static final String MEMBER_ID = "php.member-1"; + private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; + private static final String NETWORK_PARTITION_ID = "network-partition-1"; + private static final String PARTITION_ID = "partition-1"; + private static final String TENANT_ID = "-1234"; + private static final String SERVICE_NAME = "php"; + private static final String SOURCE_PATH = "/tmp/stratos-pca-startup-test-app-path/"; + + + @BeforeSuite + public void setupStartUpTest() { + // Set jndi.properties.dir system property for initializing event publishers and receivers + System.setProperty("jndi.properties.dir", getResourcesPath(RESOURCES_PATH)); + + // start Python agent with configurations provided in resource path + setup(RESOURCES_PATH); + } + + + /** + * TearDown method for test method testPythonCartridgeAgent + */ + @AfterSuite + public void tearDownStartUpTest() { + tearDown(); + } + + @Test(timeOut = STARTUP_TIMEOUT) + public void testPythonCartridgeAgent() { + Thread communicatorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!eventReceiverInitiated) { + sleep(2000); + } + List<String> outputLines = new ArrayList<String>(); + while (!outputStream.isClosed()) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Exception in thread") || line.contains("ERROR")) { + try { + throw new RuntimeException(line); + } + catch (Exception e) { + log.error("ERROR found in PCA log", e); + } + } + if (line.contains("Subscribed to 'topology/#'")) { + sleep(2000); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + + // Simulate server socket + startServerSocket(8080); + } + /* + if (line.contains("Artifact repository found")) { + // Send artifact updated event + ArrayList<ArtifactUpdatedEvent> list = getArtifactUpdatedEventsAsParams(); + for (ArtifactUpdatedEvent artifactUpdatedEvent : list) { + publishEvent(artifactUpdatedEvent); + } + }*/ + log.info(line); + } + } + sleep(100); + } + } + }); + + communicatorThread.start(); + + while (!instanceActivated) { + // wait until the instance activated event is received. + sleep(2000); + } + + assertTrue("Instance started event was not received", instanceStarted); + assertTrue("Instance activated event was not received", instanceActivated); + } + + + /** + * Create test topology + * + * @return + */ + private Topology createTestTopology() { + Topology topology = new Topology(); + Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); + topology.addService(service); + + Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, + AUTOSCALING_POLICY_NAME, APP_ID); + service.addCluster(cluster); + + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, + CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, + System.currentTimeMillis()); + + member.setDefaultPrivateIP("10.0.0.1"); + member.setDefaultPublicIP("20.0.0.1"); + Properties properties = new Properties(); + properties.setProperty("prop1", "value1"); + member.setProperties(properties); + member.setStatus(MemberStatus.Created); + cluster.addMember(member); + + return topology; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/resources/pca-testing1.xml ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/resources/pca-testing1.xml b/products/python-cartridge-agent/integration/src/test/resources/pca-testing1.xml index a13f950..35e60e3 100755 --- a/products/python-cartridge-agent/integration/src/test/resources/pca-testing1.xml +++ b/products/python-cartridge-agent/integration/src/test/resources/pca-testing1.xml @@ -23,7 +23,7 @@ <suite name="PythonCartridgeAgentIntegrationSuite1"> <test name="PCATest"> <classes> - <class name="org.apache.stratos.python.cartridge.agent.test.PythonCartridgeAgentTest"/> + <class name="org.apache.stratos.python.cartridge.agent.test.StartUpTest"/> </classes> </test> </suite> http://git-wip-us.apache.org/repos/asf/stratos/blob/bbfa6c5b/products/python-cartridge-agent/integration/src/test/resources/suite-1/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/integration/src/test/resources/suite-1/agent.conf b/products/python-cartridge-agent/integration/src/test/resources/suite-1/agent.conf index 136433d..dc34a1f 100755 --- a/products/python-cartridge-agent/integration/src/test/resources/suite-1/agent.conf +++ b/products/python-cartridge-agent/integration/src/test/resources/suite-1/agent.conf @@ -40,7 +40,7 @@ monitoring.server.secure.port =7712 monitoring.server.admin.username =admin monitoring.server.admin.password =admin #log.file.paths =/home/chamilad/dev/wso2esb-4.8.1/repository/logs/wso2carbon.log -log.file.paths =/tmp/agent.screen.log +log.file.paths =/tmp/agent.screen-startup-test.log metadata.service.url =https://localhost:9443 super.tenant.repository.path =/repository/deployment/server/ tenant.repository.path =/repository/tenants/ \ No newline at end of file
