Support application update in the Applications Topology
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/21e6c208 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/21e6c208 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/21e6c208 Branch: refs/heads/stratos-4.1.x Commit: 21e6c20815dadab33db5ba61aaf3c1569848afa5 Parents: b5d1020 Author: reka <[email protected]> Authored: Thu Sep 10 16:56:57 2015 +0530 Committer: reka <[email protected]> Committed: Wed Sep 16 10:49:03 2015 +0530 ---------------------------------------------------------------------- .../topic/ApplicationsEventPublisher.java | 20 +++ .../services/impl/AutoscalerServiceImpl.java | 3 + .../application/ApplicationUpdatedEvent.java | 43 ++++++ .../ApplicationUpdatedEventListener.java | 27 ++++ .../ApplicationUpdatedMessageProcessor.java | 152 +++++++++++++++++++ .../ApplicationsMessageProcessorChain.java | 6 + .../application/ApplicationUpdateTest.java | 37 ++++- 7 files changed, 283 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java index a5abc26..f66e525 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java @@ -44,15 +44,35 @@ public class ApplicationsEventPublisher { } public static void sendApplicationCreatedEvent(Application application) { + if(log.isInfoEnabled()) { + log.info("Sending application created event for [application] " + + application.getUniqueIdentifier()); + } publishEvent(new ApplicationCreatedEvent(application)); } + public static void sendApplicationUpdated(Application application) { + if(log.isInfoEnabled()) { + log.info("Sending application updated event for [application] " + + application.getUniqueIdentifier()); + } + publishEvent(new ApplicationUpdatedEvent(application)); + } + public static void sendApplicationDeletedEvent(String appId, Set<ClusterDataHolder> clusterData) { + if(log.isInfoEnabled()) { + log.info("Sending application deleted event for [application] " + + appId); + } publishEvent(new ApplicationDeletedEvent(appId, clusterData)); } public static void sendApplicationInstanceCreatedEvent(String appId, ApplicationInstance applicationInstance) { + if(log.isInfoEnabled()) { + log.info("Sending application instnace created event for [application] " + + appId + " [instance] " + applicationInstance.getInstanceId()); + } publishEvent(new ApplicationInstanceCreatedEvent(appId, applicationInstance)); } http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java index 47b03bb..d9023f7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java @@ -200,6 +200,9 @@ public class AutoscalerServiceImpl implements AutoscalerService { //updating the applicationContext AutoscalerContext.getInstance().updateApplicationContext(applicationContext); + //Send application Updated event + ApplicationsEventPublisher.sendApplicationUpdated(application); + if (log.isInfoEnabled()) { log.info(String.format("Application updated successfully: [application-id] %s", applicationId)); http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/ApplicationUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/ApplicationUpdatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/ApplicationUpdatedEvent.java new file mode 100644 index 0000000..4c2f7e0 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/ApplicationUpdatedEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application; + +import org.apache.stratos.messaging.domain.application.Application; +import org.apache.stratos.messaging.event.Event; + +import java.io.Serializable; + +/** + * This event will be fired upon the application updated is detected. + */ +public class ApplicationUpdatedEvent extends Event implements Serializable { + + private static final long serialVersionUID = 2625412714611885089L; + + private Application application; + + public ApplicationUpdatedEvent(Application application) { + this.application = application; + } + + public Application getApplication() { + return application; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/ApplicationUpdatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/ApplicationUpdatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/ApplicationUpdatedEventListener.java new file mode 100644 index 0000000..fc02e59 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/ApplicationUpdatedEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered when application update happens + */ +abstract class ApplicationUpdatedEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationUpdatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationUpdatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationUpdatedMessageProcessor.java new file mode 100644 index 0000000..99282ca --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationUpdatedMessageProcessor.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.processor.application; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.application.Application; +import org.apache.stratos.messaging.domain.application.Applications; +import org.apache.stratos.messaging.domain.application.ClusterDataHolder; +import org.apache.stratos.messaging.domain.application.Group; +import org.apache.stratos.messaging.event.application.ApplicationCreatedEvent; +import org.apache.stratos.messaging.event.application.ApplicationUpdatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.application.updater.ApplicationsUpdater; +import org.apache.stratos.messaging.util.MessagingUtil; + +import java.util.Set; + +public class ApplicationUpdatedMessageProcessor extends MessageProcessor { + + private static final Log log = LogFactory.getLog(ApplicationUpdatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + + Applications applications = (Applications) object; + + if (ApplicationUpdatedEvent.class.getName().equals(type)) { + if (!applications.isInitialized()) { + return false; + } + + ApplicationUpdatedEvent event = (ApplicationUpdatedEvent) MessagingUtil. + jsonToObject(message, ApplicationUpdatedEvent.class); + if (event == null) { + log.error("Unable to convert the JSON message to ApplicationCreatedEvent"); + return false; + } + + ApplicationsUpdater.acquireWriteLockForApplications(); + try { + return doProcess(event, applications); + + } finally { + ApplicationsUpdater.releaseWriteLockForApplications(); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, applications); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess(ApplicationUpdatedEvent event, Applications applications) { + + // check if required properties are available + if (event.getApplication() == null) { + String errorMsg = "Application object of application updated event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + if (event.getApplication().getUniqueIdentifier() == null || + event.getApplication().getUniqueIdentifier().isEmpty()) { + String errorMsg = "App id of application updated event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + // check if an Application with same name exists in applications + if (applications.applicationExists(event.getApplication().getUniqueIdentifier())) { + Application updatedApplication = event.getApplication(); + // add application update to applications Topology + Application application = applications. + getApplication(event.getApplication().getUniqueIdentifier()); + //Update Application Recursively + Set<Group> groups = application.getAllGroupsRecursively(); + for (Group group : groups) { + Group updatedGroup = updatedApplication.getGroupRecursively(group.getUniqueIdentifier()); + + if (updatedGroup != null) { + group.setGroupMaxInstances(updatedGroup.getGroupMaxInstances()); + group.setGroupMinInstances(updatedGroup.getGroupMinInstances()); + } else { + log.warn("[Goup] " + group.getUniqueIdentifier() + + " cannot be found in [application] " + application.getUniqueIdentifier()); + } + + } + + Set<ClusterDataHolder> clusterDataHolders = application.getClusterDataRecursively(); + for (ClusterDataHolder dataHolder : clusterDataHolders) { + Set<ClusterDataHolder> updatedClusters = updatedApplication. + getClusterDataRecursively(); + boolean clusterFound = false; + for(ClusterDataHolder updatedCluster : updatedClusters) { + if (updatedCluster.getClusterId().equals(dataHolder.getClusterId())) { + dataHolder.setMinInstances(updatedCluster.getMinInstances()); + dataHolder.setMaxInstances(updatedCluster.getMaxInstances()); + clusterFound = true; + break; + } + } + if(!clusterFound) { + log.warn("[Cluster] " + dataHolder.getClusterId() + + " cannot be found in [application] " + application.getUniqueIdentifier()); + } + + } + + + if (log.isInfoEnabled()) { + log.info("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] updated"); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] not exists"); + } + + } + + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationsMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationsMessageProcessorChain.java index b92a236..bed9b0d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationsMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationsMessageProcessorChain.java @@ -39,6 +39,7 @@ public class ApplicationsMessageProcessorChain extends MessageProcessorChain { private ApplicationInstanceCreatedMessageProcessor applicationInstanceCreatedMessageProcessor; private ApplicationInstanceActivatedMessageProcessor applicationActivatedMessageProcessor; private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; + private ApplicationUpdatedMessageProcessor applicationUpdatedMessageProcessor; private ApplicationDeletedMessageProcessor applicationDeletedMessageProcessor; private ApplicationInstanceInactivatedMessageProcessor applicationInactivatedMessageProcessor; private ApplicationInstanceTerminatedMessageProcessor applicationTerminatedMessageProcessor; @@ -67,6 +68,9 @@ public class ApplicationsMessageProcessorChain extends MessageProcessorChain { applicationInstanceCreatedMessageProcessor = new ApplicationInstanceCreatedMessageProcessor(); add(applicationInstanceCreatedMessageProcessor); + applicationUpdatedMessageProcessor = new ApplicationUpdatedMessageProcessor(); + add(applicationUpdatedMessageProcessor); + applicationActivatedMessageProcessor = new ApplicationInstanceActivatedMessageProcessor(); add(applicationActivatedMessageProcessor); @@ -112,6 +116,8 @@ public class ApplicationsMessageProcessorChain extends MessageProcessorChain { applicationInstanceCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationCreatedEventListener) { applicationCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationUndeployedEventListener) { + applicationUpdatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationDeletedEventListener) { applicationDeletedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationInstanceActivatedEventListener) { http://git-wip-us.apache.org/repos/asf/stratos/blob/21e6c208/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java index b07ea2f..7148660 100644 --- a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java +++ b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java @@ -24,10 +24,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.beans.application.ApplicationBean; import org.apache.stratos.common.beans.cartridge.CartridgeGroupBean; import org.apache.stratos.common.beans.policy.deployment.ApplicationPolicyBean; +<<<<<<< HEAD:products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java import org.apache.stratos.integration.common.RestConstants; import org.apache.stratos.integration.common.TopologyHandler; import org.apache.stratos.integration.tests.StratosIntegrationTest; +======= +import org.apache.stratos.integration.tests.RestConstants; +import org.apache.stratos.integration.tests.StratosTestServerManager; +import org.apache.stratos.integration.tests.TopologyHandler; +import org.apache.stratos.messaging.domain.application.Application; +>>>>>>> Support application update in the Applications Topology:products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/application/ApplicationUpdateTest.java import org.apache.stratos.messaging.domain.application.ApplicationStatus; +import org.apache.stratos.messaging.domain.application.ClusterDataHolder; +import org.apache.stratos.messaging.domain.application.Group; +import org.apache.stratos.messaging.message.receiver.application.ApplicationManager; import org.testng.annotations.Test; import static junit.framework.Assert.assertEquals; @@ -134,14 +144,31 @@ public class ApplicationUpdateTest extends StratosIntegrationTest { RestConstants.APPLICATIONS_NAME); assertEquals(updated, true); - topologyHandler.assertGroupInstanceCount(bean.getApplicationId(), "group3-application-update-test", 2); - - topologyHandler.assertClusterMinMemberCount(bean.getApplicationId(), 2); - ApplicationBean updatedBean = (ApplicationBean) restClient.getEntity(RestConstants.APPLICATIONS, "g-sc-G123-1-application-update-test", ApplicationBean.class, RestConstants.APPLICATIONS_NAME); assertEquals(updatedBean.getApplicationId(), "g-sc-G123-1-application-update-test"); + //Need to validate whether the updated taken into the applications Topology + Application application = ApplicationManager.getApplications(). + getApplication(bean.getApplicationId()); + + Group group = application.getGroupRecursively("group3-application-update-test"); + assertEquals(group.getGroupMaxInstances(), 3); + assertEquals(group.getGroupMinInstances(), 2); + log.info("Application update is successfully done for [application] " + + bean.getApplicationId() + " [group] " + group.getUniqueIdentifier()); + + ClusterDataHolder clusterDataHolder = application. + getClusterDataHolderRecursivelyByAlias("c3-1x0-application-update-test"); + assertEquals(clusterDataHolder.getMaxInstances(), 3); + assertEquals(clusterDataHolder.getMinInstances(), 2); + log.info("Application update is successfully done for [application] " + + bean.getApplicationId() + " [Cluster] " + clusterDataHolder.getClusterId()); + + topologyHandler.assertGroupInstanceCount(bean.getApplicationId(), "group3-application-update-test", 2); + + topologyHandler.assertClusterMinMemberCount(bean.getApplicationId(), 2); + boolean removedGroup = restClient.removeEntity(RestConstants.CARTRIDGE_GROUPS, "G1-application-update-test", RestConstants.CARTRIDGE_GROUPS_NAME); assertEquals(removedGroup, false); @@ -244,4 +271,4 @@ public class ApplicationUpdateTest extends StratosIntegrationTest { assertTrue("An error occurred while handling application deployment/undeployment and update", false); } } -} \ No newline at end of file +}
