http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java index 9376fd7..e56d7bf 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java @@ -40,18 +40,14 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor @Override public boolean process(String type, String message, Object object) { - Applications applications = (Applications) object; - if (ApplicationInstanceCreatedEvent.class.getName().equals(type)) { - if (!applications.isInitialized()) { return false; } - - - ApplicationInstanceCreatedEvent event = (ApplicationInstanceCreatedEvent) MessagingUtil.jsonToObject(message, - ApplicationInstanceCreatedEvent.class); + ApplicationInstanceCreatedEvent event = + (ApplicationInstanceCreatedEvent) MessagingUtil.jsonToObject(message, + ApplicationInstanceCreatedEvent.class); if (event == null) { log.error("Unable to convert the JSON message to ApplicationInstanceCreatedEvent"); return false; @@ -60,17 +56,18 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor ApplicationsUpdater.acquireWriteLockForApplications(); try { return doProcess(event, applications); - - } finally { + } + finally { ApplicationsUpdater.releaseWriteLockForApplications(); } - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. return nextProcessor.process(type, message, applications); } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + throw new RuntimeException(String.format( + "Failed to process message using available message processors: [type] %s [body] %s", type, + message)); } } } @@ -78,13 +75,18 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor private boolean doProcess(ApplicationInstanceCreatedEvent event, Applications applications) { // check if required properties are available - if (event.getApplicationInstance() == null) { - String errorMsg = "Application instance object of application instance created event is invalid"; + if (event.getApplicationInstance() == null || event.getApplicationId() == null) { + String errorMsg = "Application instance object of ApplicationInstanceCreatedEvent is invalid. " + + "[ApplicationId] " + event.getApplicationId() + ", [ApplicationInstance] " + + event.getApplicationInstance(); + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + if (applications == null) { + String errorMsg = "Error! Applications object is null"; log.error(errorMsg); throw new RuntimeException(errorMsg); - } - ApplicationInstance applicationInstance = event.getApplicationInstance(); if (applicationInstance.getInstanceId() == null || applicationInstance.getInstanceId().isEmpty()) { @@ -96,16 +98,14 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor // check if an Application instance with same name exists in applications instance if (null != applications.getApplication(event.getApplicationId()). - getInstanceByNetworkPartitionId(applicationInstance.getNetworkPartitionUuid())) { - - log.warn("Application instance with id [ " + applicationInstance.getInstanceId() + " ] already exists"); - + getInstanceByNetworkPartitionId(applicationInstance.getNetworkPartitionId())) { + log.warn("Application instance [AppInstanceId] " + applicationInstance.getInstanceId() + " already exists"); } else { // add application instance to Application Topology - applications.getApplication(event.getApplicationId()).addInstance(applicationInstance.getInstanceId(), applicationInstance); + applications.getApplication(event.getApplicationId()) + .addInstance(applicationInstance.getInstanceId(), applicationInstance); } - notifyEventListeners(event); return true; } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java index 8c3644c..dcae73e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -73,13 +74,19 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor List<Cluster> clusters = event.getClusterList(); for (Cluster cluster : clusters) { - String serviceUuid = cluster.getServiceName(); + String applicationId = cluster.getAppId(); + String serviceName = cluster.getServiceName(); String clusterId = cluster.getClusterId(); - TopologyUpdater.acquireWriteLockForService(serviceUuid); + TopologyUpdater.acquireWriteLockForService(serviceName); + try { + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + continue; + } // Apply service filter - if (TopologyServiceFilter.apply(serviceUuid)) { + if (TopologyServiceFilter.apply(serviceName)) { continue; } @@ -89,18 +96,18 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor } // Validate event against the existing topology - Service service = topology.getService(serviceUuid); + Service service = topology.getService(serviceName); if (service == null) { if (log.isWarnEnabled()) { log.warn(String.format("Service does not exist: [service] %s", - serviceUuid)); + serviceName)); } return false; } if (service.clusterExists(clusterId)) { if (log.isDebugEnabled()) { log.debug(String.format("Cluster already exists in service: [service] %s " + - "[cluster] %s", serviceUuid, + "[cluster] %s", serviceName, clusterId)); } } else { @@ -115,7 +122,7 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor } } finally { - TopologyUpdater.releaseWriteLockForService(serviceUuid); + TopologyUpdater.releaseWriteLockForService(serviceName); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java index 0f558b2..8fb28cb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java @@ -56,12 +56,12 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { ClusterInstanceCreatedEvent event = (ClusterInstanceCreatedEvent) MessagingUtil. jsonToObject(message, ClusterInstanceCreatedEvent.class); - TopologyUpdater.acquireWriteLockForService(event.getServiceUuid()); + TopologyUpdater.acquireWriteLockForService(event.getServiceName()); try { return doProcess(event, topology); } finally { - TopologyUpdater.releaseWriteLockForService(event.getServiceUuid()); + TopologyUpdater.releaseWriteLockForService(event.getServiceName()); } } else { @@ -76,7 +76,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { private boolean doProcess(ClusterInstanceCreatedEvent event, Topology topology) { - String serviceName = event.getServiceUuid(); + String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); // Apply service filter @@ -90,11 +90,11 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { } // Validate event against the existing topology - Service service = topology.getService(event.getServiceUuid()); + Service service = topology.getService(event.getServiceName()); if (service == null) { if (log.isWarnEnabled()) { log.warn(String.format("Service does not exist: [service] %s", - event.getServiceUuid())); + event.getServiceName())); } return false; } @@ -108,7 +108,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { if (cluster == null) { if (log.isDebugEnabled()) { - log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceUuid(), + log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), event.getClusterId())); } return false; @@ -118,7 +118,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { if (cluster.getInstanceContexts(clusterInstance.getInstanceId()) != null) { if (log.isDebugEnabled()) { log.debug(String.format("Cluster Instance already exists in service: " + - "[service] %s [cluster] %s [Instance] %s", event.getServiceUuid(), + "[service] %s [cluster] %s [Instance] %s", event.getServiceName(), event.getClusterId(), clusterInstance.getInstanceId())); } } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java index e3279cd..9460ea6 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java @@ -86,12 +86,12 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor { } else { // Apply changes to the topology - Service service = new Service(event.getServiceName(), event.getServiceType(), event.getServiceUuid()); + Service service = new Service(event.getServiceName(), event.getServiceType()); service.addPorts(event.getPorts()); topology.addService(service); if (log.isInfoEnabled()) { - log.info(String.format("Service created: [service] %s [service-uuid] %s", event.getServiceName(),event.getServiceUuid())); + log.info(String.format("Service created: [service] %s", event.getServiceName())); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java index f9878d6..8fc3376 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java @@ -70,7 +70,7 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { private boolean doProcess(ServiceRemovedEvent event, Topology topology) { - String serviceName = event.getServiceUuid(); + String serviceName = event.getServiceName(); // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { @@ -81,11 +81,11 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { notifyEventListeners(event); // Validate event against the existing topology - Service service = topology.getService(event.getServiceUuid()); + Service service = topology.getService(event.getServiceName()); if (service == null) { if (log.isDebugEnabled()) { log.debug(String.format("Service does not exist: [service] %s", - event.getServiceUuid())); + event.getServiceName())); } } else { @@ -93,7 +93,7 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { topology.removeService(service); if (log.isInfoEnabled()) { - log.info(String.format("Service removed: [service] %s", event.getServiceUuid())); + log.info(String.format("Service removed: [service] %s", event.getServiceName())); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java index 5ab2cf5..d7ab46b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java @@ -108,21 +108,21 @@ public class TopologyUpdater { /** * Acquires write lock for a Service * - * @param serviceUuid service uuid to acquire write lock + * @param serviceName service name to acquire write lock */ - public static void acquireWriteLockForService(String serviceUuid) { + public static void acquireWriteLockForService(String serviceName) { // acquire read lock for all Applications TopologyManager.acquireReadLockForServices(); - TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceUuid, true); + TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName, true); if (topologyServiceLock == null) { - handleLockNotFound("Topology lock not found for Service " + serviceUuid); + handleLockNotFound("Topology lock not found for Service " + serviceName); } else { topologyServiceLock.acquireWriteLock(); if (log.isDebugEnabled()) { - log.debug("Write lock acquired for Service " + serviceUuid); + log.debug("Write lock acquired for Service " + serviceName); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java index 0dc5e67..c2d1c6c 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java @@ -69,7 +69,6 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), memoryConsumption)); } healthStatisticsPublisher.publish( - System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -94,7 +93,6 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), loadAvereage)); } healthStatisticsPublisher.publish( - System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -118,7 +116,6 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), requestsInFlight)); } inFlightRequestPublisher.publish( - System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java b/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java new file mode 100644 index 0000000..620e11b --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.python.cartridge.agent.test; + +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.exec.*; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.domain.LoadBalancingIPType; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener; +import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; +import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.util.MessagingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.*; +import java.util.concurrent.ExecutorService; + +import static junit.framework.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class PythonCartridgeAgentTest { + + private static final Log log = LogFactory.getLog(PythonCartridgeAgentTest.class); + + private static final String NEW_LINE = System.getProperty("line.separator"); + // private static final long TIMEOUT = 1440000; + private static final long TIMEOUT = 120000; + private static final String CLUSTER_ID = "php.php.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1"; + private static final String APP_ID = "application-1"; + private static final String MEMBER_ID = "php.member-1"; + private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; + private static final String NETWORK_PARTITION_ID = "network-partition-1"; + private static final String PARTITION_ID = "partition-1"; + private static final String TENANT_ID = "-1234"; + private static final String SERVICE_NAME = "php"; + public static final String SOURCE_PATH = "/tmp/stratos-pca-test-app-path/"; + + private static List<ServerSocket> serverSocketList; + private static Map<String, Executor> executorList; + private final ArtifactUpdatedEvent artifactUpdatedEvent; + private final Boolean expectedResult; + private boolean instanceStarted; + private boolean instanceActivated; + private ByteArrayOutputStreamLocal outputStream; + private boolean eventReceiverInitiated = false; + private TopologyEventReceiver topologyEventReceiver; + private InstanceStatusEventReceiver instanceStatusEventReceiver; + private int cepPort = 7712; + private BrokerService broker = new BrokerService(); + private static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "tcp://localhost:61617"; + private static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "mqtt://localhost:1884"; + private static final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); + + public PythonCartridgeAgentTest(ArtifactUpdatedEvent artifactUpdatedEvent, Boolean expectedResult) { + this.artifactUpdatedEvent = artifactUpdatedEvent; + this.expectedResult = expectedResult; + } + + /** + * Setup method for test class + */ + @BeforeClass + public static void oneTimeSetUp() { + // Set jndi.properties.dir system property for initializing event publishers and receivers + System.setProperty("jndi.properties.dir", getResourcesFolderPath()); + } + + /** + * Setup method for test method testPythonCartridgeAgent + */ + @Before + public void setup() { + serverSocketList = new ArrayList<ServerSocket>(); + executorList = new HashMap<String, Executor>(); + try { + broker.addConnector(ACTIVEMQ_AMQP_BIND_ADDRESS); + broker.addConnector(ACTIVEMQ_MQTT_BIND_ADDRESS); + broker.setBrokerName("testBroker"); + broker.setDataDirectory(PythonCartridgeAgentTest.class.getResource("/").getPath() + + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "activemq-data"); + broker.start(); + log.info("Broker service started!"); + } + catch (Exception e) { + log.error("Error while setting up broker service", e); + } + if (!this.eventReceiverInitiated) { + ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 15); + topologyEventReceiver = new TopologyEventReceiver(); + topologyEventReceiver.setExecutorService(executorService); + topologyEventReceiver.execute(); + + instanceStatusEventReceiver = new InstanceStatusEventReceiver(); + instanceStatusEventReceiver.setExecutorService(executorService); + instanceStatusEventReceiver.execute(); + + this.instanceStarted = false; + instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance started event received"); + instanceStarted = true; + } + }); + + this.instanceActivated = false; + instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance activated event received"); + instanceActivated = true; + } + }); + + this.eventReceiverInitiated = true; + } + // Simulate CEP server socket + startServerSocket(cepPort); + String agentPath = setupPythonAgent(); + log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME); + log.info("Starting python cartridge agent..."); + this.outputStream = executeCommand( + "python " + agentPath + "/agent.py > " + getResourcesFolderPath() + File.separator + ".." + + File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "cartridge-agent.log"); + } + + /** + * TearDown method for test method testPythonCartridgeAgent + */ + @After + public void tearDown() { + for (Map.Entry<String, Executor> entry : executorList.entrySet()) { + try { + String commandText = entry.getKey(); + Executor executor = entry.getValue(); + ExecuteWatchdog watchdog = executor.getWatchdog(); + if (watchdog != null) { + log.info("Terminating process: " + commandText); + watchdog.destroyProcess(); + } + } + catch (Exception ignore) { + } + } + for (ServerSocket serverSocket : serverSocketList) { + try { + log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress()); + serverSocket.close(); + } + catch (IOException ignore) { + } + } + + try { + log.info("Deleting source checkout folder..."); + FileUtils.deleteDirectory(new File(SOURCE_PATH)); + } + catch (Exception ignore) { + + } + + this.instanceStatusEventReceiver.terminate(); + this.topologyEventReceiver.terminate(); + + this.instanceActivated = false; + this.instanceStarted = false; + try { + broker.stop(); + } + catch (Exception e) { + log.error("Error while stopping the broker service", e); + } + } + + + /** + * This method returns a collection of {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} + * objects as parameters to the test + * + * @return + */ + @Parameterized.Parameters + public static Collection getArtifactUpdatedEventsAsParams() { + ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); + + ArtifactUpdatedEvent privateRepoEvent = createTestArtifactUpdatedEvent(); + privateRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/testrepo.git"); + privateRepoEvent.setRepoUserName("testapache2211"); + privateRepoEvent.setRepoPassword("RExPDGa4GkPJj4kJDzSROQ=="); + + ArtifactUpdatedEvent privateRepoEvent2 = createTestArtifactUpdatedEvent(); + privateRepoEvent2.setRepoURL("https://[email protected]/testapache2211/testrepo.git"); + privateRepoEvent2.setRepoUserName("testapache2211"); + privateRepoEvent2.setRepoPassword("iF7qT+BKKPE3PGV1TeDsJA=="); + + return Arrays.asList(new Object[][]{ + {publicRepoEvent, true}, + {privateRepoEvent, true}, + {privateRepoEvent2, true} + }); + +// return Arrays.asList(new Object[][]{ +// {publicRepoEvent, true} +// }); + + } + + /** + * Creates an {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} object with a public + * repository URL + * + * @return + */ + private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { + ArtifactUpdatedEvent publicRepoEvent = new ArtifactUpdatedEvent(); + publicRepoEvent.setClusterId(CLUSTER_ID); + publicRepoEvent.setTenantId(TENANT_ID); + publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); + return publicRepoEvent; + } + + @Test(timeout = TIMEOUT) + public void testPythonCartridgeAgent() { + Thread communicatorThread = new Thread(new Runnable() { + @Override + public void run() { + List<String> outputLines = new ArrayList<String>(); + while (!outputStream.isClosed()) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Subscribed to 'topology/#'")) { + sleep(1000); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + + sleep(3000); + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + + // Simulate server socket + startServerSocket(8080); + } + if (line.contains("Artifact repository found")) { + // Send artifact updated event + publishEvent(artifactUpdatedEvent); + } + + if (line.contains("Exception in thread") || line.contains("ERROR")) { + //throw new RuntimeException(line); + } + log.info(line); + } + } + sleep(100); + } + } + }); + + communicatorThread.start(); + + while (!instanceActivated) { + // wait until the instance activated event is received. + sleep(2000); + } + + assertTrue("Instance started event was not received", instanceStarted); + assertTrue("Instance activated event was not received", instanceActivated == this.expectedResult); + } + + /** + * Publish messaging event + * + * @param event + */ + private void publishEvent(Event event) { + String topicName = MessagingUtil.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topicName); + eventPublisher.publish(event); + } + + /** + * Start server socket + * + * @param port + */ + private void startServerSocket(final int port) { + Thread socketThread = new Thread(new Runnable() { + @Override + public void run() { + try { + ServerSocket serverSocket = new ServerSocket(port); + serverSocketList.add(serverSocket); + log.info("Server socket started on port: " + port); + serverSocket.accept(); + } + catch (IOException e) { + String message = "Could not start server socket: [port] " + port; + log.error(message, e); + throw new RuntimeException(message, e); + } + } + }); + socketThread.start(); + } + + /** + * Create test topology + * + * @return + */ + private Topology createTestTopology() { + Topology topology = new Topology(); + Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); + topology.addService(service); + + Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, + AUTOSCALING_POLICY_NAME, APP_ID); + service.addCluster(cluster); + + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, + CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, + System.currentTimeMillis()); + + member.setDefaultPrivateIP("10.0.0.1"); + member.setDefaultPublicIP("20.0.0.1"); + Properties properties = new Properties(); + properties.setProperty("prop1", "value1"); + member.setProperties(properties); + member.setStatus(MemberStatus.Created); + cluster.addMember(member); + + return topology; + } + + /** + * Return new lines found in the output + * + * @param currentOutputLines current output lines + * @param output output + * @return + */ + private List<String> getNewLines(List<String> currentOutputLines, String output) { + List<String> newLines = new ArrayList<String>(); + + if (StringUtils.isNotBlank(output)) { + String[] lines = output.split(NEW_LINE); + if (lines != null) { + for (String line : lines) { + if (!currentOutputLines.contains(line)) { + currentOutputLines.add(line); + newLines.add(line); + } + } + } + } + return newLines; + } + + /** + * Sleep current thread + * + * @param time + */ + private void sleep(long time) { + try { + Thread.sleep(time); + } + catch (InterruptedException ignore) { + } + } + + /** + * Copy python agent distribution to a new folder, extract it and copy sample configuration files + * + * @return + */ + private String setupPythonAgent() { + try { + log.info("Setting up python cartridge agent..."); + String srcAgentPath = getResourcesFolderPath() + "/../../src/main/python/cartridge.agent/cartridge.agent"; + String destAgentPath = + getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME + + "/cartridge.agent"; + FileUtils.copyDirectory(new File(srcAgentPath), new File(destAgentPath)); + + String srcAgentConfPath = getResourcesFolderPath() + "/agent.conf"; + String destAgentConfPath = destAgentPath + "/agent.conf"; + FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath)); + + String srcLoggingIniPath = getResourcesFolderPath() + "/logging.ini"; + String destLoggingIniPath = destAgentPath + "/logging.ini"; + FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath)); + + String srcPayloadPath = getResourcesFolderPath() + "/payload"; + String destPayloadPath = destAgentPath + "/payload"; + FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath)); + + log.info("Changing extension scripts permissions"); + File extensionsPath = new File(destAgentPath + "/extensions/bash"); + File[] extensions = extensionsPath.listFiles(); + for (File extension : extensions) { + extension.setExecutable(true); + } + + log.info("Python cartridge agent setup completed"); + + return destAgentPath; + } + catch (Exception e) { + String message = "Could not copy cartridge agent distribution"; + log.error(message, e); + throw new RuntimeException(message, e); + } + } + + /** + * Execute shell command + * + * @param commandText + */ + private ByteArrayOutputStreamLocal executeCommand(final String commandText) { + final ByteArrayOutputStreamLocal outputStream = new ByteArrayOutputStreamLocal(); + try { + CommandLine commandline = CommandLine.parse(commandText); + DefaultExecutor exec = new DefaultExecutor(); + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); + exec.setWorkingDirectory(new File( + getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME)); + exec.setStreamHandler(streamHandler); + ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT); + exec.setWatchdog(watchdog); + exec.execute(commandline, new ExecuteResultHandler() { + @Override + public void onProcessComplete(int i) { + log.info(commandText + " process completed"); + } + + @Override + public void onProcessFailed(ExecuteException e) { + log.error(commandText + " process failed", e); + } + }); + executorList.put(commandText, exec); + return outputStream; + } + catch (Exception e) { + log.error(outputStream.toString(), e); + throw new RuntimeException(e); + } + } + + /** + * Get resources folder path + * + * @return + */ + private static String getResourcesFolderPath() { + String path = PythonCartridgeAgentTest.class.getResource(File.separator).getPath(); + return StringUtils.removeEnd(path, File.separator); + } + + /** + * Implements ByteArrayOutputStream.isClosed() method + */ + private class ByteArrayOutputStreamLocal extends ByteArrayOutputStream { + private boolean closed; + + @Override + public void close() throws IOException { + super.close(); + closed = true; + } + + public boolean isClosed() { + return closed; + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java index eeab1bb..abc02e9 100644 --- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java +++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java @@ -69,8 +69,7 @@ public class StratosApiV40Utils { if (cloudControllerServiceClient != null) { - Cartridge cartridgeConfig = ObjectConverter.convertCartridgeBeanToStubCartridgeConfig - (cartridgeDefinitionBean, null, -1); + Cartridge cartridgeConfig = ObjectConverter.convertCartridgeBeanToStubCartridgeConfig(cartridgeDefinitionBean); if (cartridgeConfig == null) { throw new RestAPIException("Populated CartridgeConfig instance is null, cartridge deployment aborted"); @@ -157,7 +156,7 @@ public class StratosApiV40Utils { if (autoscalerServiceClient != null) { org.apache.stratos.autoscaler.stub.autoscale.policy.AutoscalePolicy autoscalePolicy = ObjectConverter. - convertToCCAutoscalerPojo(autoscalePolicyBean,null,-1234); + convertToCCAutoscalerPojo(autoscalePolicyBean); try { autoscalerServiceClient @@ -372,7 +371,7 @@ public class StratosApiV40Utils { for (String cartridgeType : availableCartridges) { Cartridge cartridgeInfo = null; try { - cartridgeInfo = CloudControllerServiceClient.getInstance().getCartridgeByTenant(cartridgeType,-1234); + cartridgeInfo = CloudControllerServiceClient.getInstance().getCartridge(cartridgeType); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Error when calling getCartridgeInfo for " + cartridgeType + ", Error: "
