http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java index db5d777..92d61ba 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java @@ -16,48 +16,106 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; -public class ApplicationCreatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationCreatedMessageProcessor.class); +import java.util.Set; +public class ApplicationCreatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class); private MessageProcessor nextProcessor; @Override public void setNext(MessageProcessor nextProcessor) { this.nextProcessor = nextProcessor; - } @Override public boolean process(String type, String message, Object object) { + + Topology topology = (Topology) object; + if (ApplicationCreatedEvent.class.getName().equals(type)) { - // Parse complete message and build event - ApplicationCreatedEvent event = - (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); + if (!topology.isInitialized()) { + return false; + } - if (log.isDebugEnabled()) { - log.debug("Received ApplicationCreated Event in application status topic: " + event.toString()); + ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); + if (event == null) { + log.error("Unable to convert the JSON message to ApplicationCreatedEvent"); + return false; } - // Notify event listeners - notifyEventListeners(event); - return true; + + TopologyUpdater.acquireWriteLockForApplications(); + // since the Clusters will also get modified, acquire write locks for each Service Type + Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively(); + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType()); + } + } + + try { + return doProcess(event, topology); + + } finally { + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); + } + } + TopologyUpdater.releaseWriteLockForApplications(); + } + } else { if (nextProcessor != null) { - return nextProcessor.process(type, message, object); + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); } else { - throw new RuntimeException( - String.format("Failed to process group activated message " + - "using available message processors: [type] %s [body] %s", type, message)); + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } + + private boolean doProcess (ApplicationCreatedEvent event,Topology topology) { + + // check if required properties are available + if (event.getApplication() == null) { + String errorMsg = "Application object of application created event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) { + String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + // check if an Application with same name exists in topology + if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) { + log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology"); + + } else { + // add application and the clusters to Topology + for(Cluster cluster: event.getClusterList()) { + topology.getService(cluster.getServiceName()).addCluster(cluster); + } + topology.addApplication(event.getApplication()); + } + + notifyEventListeners(event); + return true; + } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java index d8f2aac..91eae8c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java @@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ public class ApplicationInactivatedMessageProcessor extends MessageProcessor { private static final Log log = LogFactory.getLog(ApplicationInactivatedMessageProcessor.class); @@ -34,30 +41,64 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor { @Override public void setNext(MessageProcessor nextProcessor) { this.nextProcessor = nextProcessor; - } + @Override public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + // Parse complete message and build event - ApplicationInactivatedEvent event = - (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class); + ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. + jsonToObject(message, ApplicationInactivatedEvent.class); - if (log.isDebugEnabled()) { - log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString()); + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); } - // Notify event listeners - notifyEventListeners(event); - return true; + } else { if (nextProcessor != null) { - return nextProcessor.process(type, message, object); + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); } else { - throw new RuntimeException( - String.format("Failed to process group activated message " + - "using available message processors: [type] %s [body] %s", type, message)); + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); } + return false; + } else { + // Apply changes to the topology + if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Inactive + " ]"); + } + application.setStatus(ApplicationStatus.Inactive); + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java index a121ffb..8cd2182 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java @@ -20,10 +20,18 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; +import java.util.Set; + +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { private static final Log log = LogFactory.getLog(ApplicationTerminatedMessageProcessor.class); @@ -34,30 +42,95 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { @Override public void setNext(MessageProcessor nextProcessor) { this.nextProcessor = nextProcessor; - } + @Override public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + if (ApplicationTerminatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + // Parse complete message and build event - ApplicationTerminatedEvent event = - (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class); + ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util. + jsonToObject(message, ApplicationTerminatedEvent.class); - if (log.isDebugEnabled()) { - log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString()); + TopologyUpdater.acquireWriteLockForApplications(); + Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType()); + } } - // Notify event listeners - notifyEventListeners(event); - return true; + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplications(); + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); + } + } + } + } else { if (nextProcessor != null) { - return nextProcessor.process(type, message, object); + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); } else { - throw new RuntimeException( - String.format("Failed to process group activated message " + - "using available message processors: [type] %s [body] %s", type, message)); + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } + + private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) { + + // check if required properties are available + if (event.getAppId() == null) { + String errorMsg = "Application Id of application removed event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + if (event.getTenantDomain()== null) { + String errorMsg = "Application tenant domain of application removed event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + // check if an Application with same name exists in topology + String appId = event.getAppId(); + if (topology.applicationExists(appId)) { + log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it"); + topology.removeApplication(appId); + } + + if (event.getClusterData() != null) { + // remove the Clusters from the Topology + for (ClusterDataHolder clusterData : event.getClusterData()) { + Service service = topology.getService(clusterData.getServiceType()); + if (service != null) { + service.removeCluster(clusterData.getClusterId()); + if (log.isDebugEnabled()) { + log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology"); + } + } else { + log.warn("Service " + clusterData.getServiceType() + " not found in Topology!"); + } + } + } + + if (log.isDebugEnabled()) { + log.debug("ApplicationRemovedMessageProcessor notifying listener "); + } + + notifyEventListeners(event); + return true; + + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java index 280de2c..057d013 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java @@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ public class ApplicationTerminatingMessageProcessor extends MessageProcessor { private static final Log log = LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); @@ -34,30 +41,64 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor { @Override public void setNext(MessageProcessor nextProcessor) { this.nextProcessor = nextProcessor; - } + @Override public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + if (ApplicationTerminatingEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + // Parse complete message and build event - ApplicationTerminatingEvent event = - (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class); + ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util. + jsonToObject(message, ApplicationTerminatingEvent.class); - if (log.isDebugEnabled()) { - log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString()); + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); } - // Notify event listeners - notifyEventListeners(event); - return true; + } else { if (nextProcessor != null) { - return nextProcessor.process(type, message, object); + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); } else { - throw new RuntimeException( - String.format("Failed to process group activated message " + - "using available message processors: [type] %s [body] %s", type, message)); + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); } + return false; + } else { + // Apply changes to the topology + if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Terminating + " ]"); + } + application.setStatus(ApplicationStatus.Terminating); + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java new file mode 100644 index 0000000..7e91ab8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +import java.util.Set; + +public class ApplicationUndeployedMessageProcessor extends MessageProcessor { + + private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class); + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + + Topology topology = (Topology) object; + + if (ApplicationUndeployedEvent.class.getName().equals(type)) { + if (!topology.isInitialized()) { + return false; + } + + ApplicationUndeployedEvent event = (ApplicationUndeployedEvent) + Util.jsonToObject(message, ApplicationUndeployedEvent.class); + if (event == null) { + log.error("Unable to convert the JSON message to ApplicationUndeployedEvent"); + return false; + } + + // get write lock for the application and relevant Clusters + TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId()); + Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(), + clusterData.getClusterId()); + } + } + + try { + return doProcess(event, topology); + + } finally { + // remove locks + if (clusterDataHolders != null) { + for (ClusterDataHolder clusterData : clusterDataHolders) { + TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(), + clusterData.getClusterId()); + } + } + TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format + ("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) { + + // update the application status to Terminating + Application application = topology.getApplication(event.getApplicationId()); + // check and update application status to 'Terminating' + if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { + log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating); + } + // for now anyway update the status forcefully + application.setStatus(ApplicationStatus.Terminating); + + // update all the Clusters' statuses to 'Terminating' + Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); + // update the Cluster statuses to Terminating + for (ClusterDataHolder clusterDataHolder : clusterData) { + Service service = topology.getService(clusterDataHolder.getServiceType()); + if (service != null) { + Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId()); + if (aCluster != null) { + // validate state transition + if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) { + log.error("Invalid state transfer from " + aCluster.getStatus() + " to " + + ClusterStatus.Terminating); + } + // for now anyway update the status forcefully + aCluster.setStatus(ClusterStatus.Terminating); + + } else { + log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() + + " in Topology"); + } + + } else { + log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " + + " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found"); + } + } + + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java new file mode 100644 index 0000000..e75b604 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.applications.*; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; + +/** + * Application Status processor chain is to handle the list processors to parse the application + * status. + */ +public class ApplicationsMessageProcessorChain extends MessageProcessorChain { + private static final Log log = LogFactory.getLog(ApplicationsMessageProcessorChain.class); + + private GroupCreatedProcessor groupCreatedMessageProcessor; + private GroupActivatedProcessor groupActivatedMessageProcessor; + private GroupInActivateProcessor groupInActivateMessageProcessor; + private GroupTerminatedProcessor groupTerminatedProcessor; + private GroupTerminatingProcessor groupTerminatingProcessor; + private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor; + private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; + private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor; + private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor; + private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor; + private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor; + + public void initialize() { + // Add instance notifier event processors + + groupCreatedMessageProcessor = new GroupCreatedProcessor(); + add(groupCreatedMessageProcessor); + + groupActivatedMessageProcessor = new GroupActivatedProcessor(); + add(groupActivatedMessageProcessor); + + groupInActivateMessageProcessor = new GroupInActivateProcessor(); + add(groupInActivateMessageProcessor); + + groupTerminatedProcessor = new GroupTerminatedProcessor(); + add(groupTerminatedProcessor); + + groupTerminatingProcessor = new GroupTerminatingProcessor(); + add(groupTerminatingProcessor); + + applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor(); + add(applicationActivatedMessageProcessor); + + applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor(); + add(applicationCreatedMessageProcessor); + + applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor(); + add(applicationInactivatedMessageProcessor); + + applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor(); + add(applicationTerminatingMessageProcessor); + + applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor(); + add(applicationTerminatedMessageProcessor); + + applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor(); + add(applicationUndeployedMessageProcessor); + + if (log.isDebugEnabled()) { + log.debug("Instance notifier message processor chain initialized"); + } + } + + public void addEventListener(EventListener eventListener) { + + if(eventListener instanceof GroupCreatedEventListener) { + groupCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupInactivateEventListener) { + groupInActivateMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupActivatedEventListener) { + groupActivatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupTerminatingEventListener) { + groupTerminatingProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupTerminatedEventListener) { + groupTerminatedProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationCreatedEventListener) { + applicationCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationActivatedEventListener) { + applicationActivatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationInactivatedEventListener) { + applicationInactivatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationTerminatingEventListener) { + applicationTerminatingMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationTerminatedEventListener) { + applicationTerminatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationUndeployedEventListener) { + applicationUndeployedMessageProcessor.addEventListener(eventListener); + } else { + throw new RuntimeException("Unknown event listener " + eventListener.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java deleted file mode 100644 index 02ddda8..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.applications; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.GroupActivatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -public class GroupActivatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(GroupActivatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (GroupActivatedEvent.class.getName().equals(type)) { - // Parse complete message and build event - GroupActivatedEvent event = - (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class); - - if (log.isDebugEnabled()) { - log.debug("Received GroupActivatedEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException( - String.format("Failed to process group activated message " + - "using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java new file mode 100644 index 0000000..845e933 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupActivatedProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupActivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupActivatedEvent event = (GroupActivatedEvent) Util. + jsonToObject(message, GroupActivatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupActivatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Active)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); + } + group.setStatus(GroupStatus.Active); + + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java deleted file mode 100644 index d04d7f9..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.applications; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.GroupCreatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -public class GroupCreatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(GroupCreatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (GroupCreatedEvent.class.getName().equals(type)) { - // Parse complete message and build event - GroupCreatedEvent event = - (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class); - - if (log.isDebugEnabled()) { - log.debug("Received AppStatusGroupCreatedEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException( - String.format("Failed to process group created message " + - "using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java new file mode 100644 index 0000000..47d4457 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupCreatedProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupCreatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupCreatedEvent event = (GroupCreatedEvent) Util. + jsonToObject(message, GroupCreatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupCreatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Created)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " + + "for Group " + group.getAlias()); + } + group.setStatus(GroupStatus.Created); + + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java new file mode 100644 index 0000000..063a3de --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupInactivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupInActivateProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupInactivateEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupInactivateEvent event = (GroupInactivateEvent) Util. + jsonToObject(message, GroupInactivateEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess(GroupInactivateEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + group.setStatus(GroupStatus.Inactive); + if (log.isInfoEnabled()) { + log.info(String.format("Group updated as in-activated : %s", + group.getUniqueIdentifier())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java deleted file mode 100644 index 6cf2587..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.applications; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -public class GroupInactivatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(GroupInactivatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (AppStatusGroupInactivateEvent.class.getName().equals(type)) { - // Parse complete message and build event - AppStatusGroupInactivateEvent event = - (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class); - - if (log.isDebugEnabled()) { - log.debug("Received GroupInActivateEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException( - String.format("Failed to process group in activated message " + - "using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java deleted file mode 100644 index a917a14..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.applications; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -public class GroupTerminatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(GroupTerminatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (GroupTerminatedEvent.class.getName().equals(type)) { - // Parse complete message and build event - GroupTerminatedEvent event = - (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class); - - if (log.isDebugEnabled()) { - log.debug("Received GroupTerminatingEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException( - String.format("Failed to process group in GroupTerminatingEvent message " + - "using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java new file mode 100644 index 0000000..3de0914 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupTerminatedProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupTerminatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupTerminatedEvent event = (GroupTerminatedEvent) Util. + jsonToObject(message, GroupTerminatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupTerminatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Terminated)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated); + } + group.setStatus(GroupStatus.Terminated); + + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java deleted file mode 100644 index 63c055d..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.applications; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -public class GroupTerminatingMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(GroupTerminatingMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (GroupTerminatingEvent.class.getName().equals(type)) { - // Parse complete message and build event - GroupTerminatingEvent event = - (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class); - - if (log.isDebugEnabled()) { - log.debug("Received GroupTerminatingEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException( - String.format("Failed to process group in GroupTerminatingEvent message " + - "using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java new file mode 100644 index 0000000..e124b7b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupTerminatingProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupTerminatingEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupTerminatingEvent event = (GroupTerminatingEvent) Util. + jsonToObject(message, GroupTerminatingEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupTerminatingEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Terminating)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); + } + group.setStatus(GroupStatus.Terminating); + + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java new file mode 100644 index 0000000..694c3f3 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterActivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterActivatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterActivatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterActivatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterActivatedEvent event = (ClusterStatusClusterActivatedEvent) Util. + jsonToObject(message, ClusterStatusClusterActivatedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received ClusterActivatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java new file mode 100644 index 0000000..9b4780b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterCreatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterCreatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterCreatedEvent event = (ClusterStatusClusterCreatedEvent) Util. + jsonToObject(message, ClusterStatusClusterCreatedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received AppStatusClusterCreatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +}
