http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java new file mode 100644 index 0000000..69918e2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterInactivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterInactivateMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterInactivateMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterInactivateEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterInactivateEvent event = (ClusterStatusClusterInactivateEvent) Util. + jsonToObject(message, ClusterStatusClusterInactivateEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received ClusterInActivateEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java new file mode 100644 index 0000000..64b6c7b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterTerminatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterTerminatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterTerminatedEvent event = (ClusterStatusClusterTerminatedEvent) Util. + jsonToObject(message, ClusterStatusClusterTerminatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received AppStatusClusterTerminatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java new file mode 100644 index 0000000..c161dd5 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatingMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterTerminatingEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterTerminatingEvent event = (ClusterStatusClusterTerminatingEvent) Util. + jsonToObject(message, ClusterStatusClusterTerminatingEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java new file mode 100644 index 0000000..29556ec --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.cluster.status.*; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; + +/** + * This is to keep track of the processors for the cluster status topic. + */ +public class ClusterStatusMessageProcessorChain extends MessageProcessorChain { + private static final Log log = LogFactory.getLog(ClusterStatusMessageProcessorChain.class); + + + private ClusterStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor; + private ClusterStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor; + private ClusterStatusClusterInactivateMessageProcessor clusterInactivateMessageProcessor; + private ClusterStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor; + private ClusterStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor; + @Override + protected void initialize() { + clusterCreatedMessageProcessor = new ClusterStatusClusterCreatedMessageProcessor(); + add(clusterCreatedMessageProcessor); + + clusterActivatedMessageProcessor = new ClusterStatusClusterActivatedMessageProcessor(); + add(clusterActivatedMessageProcessor); + + clusterInactivateMessageProcessor = new ClusterStatusClusterInactivateMessageProcessor(); + add(clusterInactivateMessageProcessor); + + clusterTerminatedMessageProcessor = new ClusterStatusClusterTerminatedMessageProcessor(); + add(clusterTerminatedMessageProcessor); + + clusterTerminatingMessageProcessor = new ClusterStatusClusterTerminatingMessageProcessor(); + add(clusterTerminatingMessageProcessor); + + if (log.isDebugEnabled()) { + log.debug("Cluster status message processor chain initialized"); + } + } + + @Override + public void addEventListener(EventListener eventListener) { + if(eventListener instanceof ClusterStatusClusterCreatedEventListener) { + clusterCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ClusterStatusClusterInactivateEventListener) { + clusterInactivateMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ClusterStatusClusterActivatedEventListener) { + clusterActivatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ClusterStatusClusterTerminatingEventListener) { + clusterTerminatingMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ClusterStatusClusterTerminatedEventListener) { + clusterTerminatedMessageProcessor.addEventListener(eventListener); + } else { + throw new RuntimeException("Unknown event listener " + eventListener.toString()); + } + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java deleted file mode 100644 index 341b402..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.ApplicationStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor responsible to process the application activation even and update the Topology. - */ -public class ApplicationActivatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationActivatedMessageProcessor.class); - - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (ApplicationActivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util. - jsonToObject(message, ApplicationActivatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess(ApplicationActivatedEvent event, Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } else { - // Apply changes to the topology - if (!application.isStateTransitionValid(ApplicationStatus.Active)) { - log.error("Invalid State transfer from [ " + application.getStatus() + - " ] to [ " + ApplicationStatus.Active + " ]"); - } - application.setStatus(ApplicationStatus.Active); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java deleted file mode 100644 index 079cb90..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -import java.util.Set; - -public class ApplicationCreatedMessageProcessor extends MessageProcessor { - - private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - - Topology topology = (Topology) object; - - if (ApplicationCreatedEvent.class.getName().equals(type)) { - if (!topology.isInitialized()) { - return false; - } - - ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); - if (event == null) { - log.error("Unable to convert the JSON message to ApplicationCreatedEvent"); - return false; - } - - TopologyUpdater.acquireWriteLockForApplications(); - // since the Clusters will also get modified, acquire write locks for each Service Type - Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively(); - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType()); - } - } - - try { - return doProcess(event, topology); - - } finally { - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); - } - } - TopologyUpdater.releaseWriteLockForApplications(); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationCreatedEvent event,Topology topology) { - - // check if required properties are available - if (event.getApplication() == null) { - String errorMsg = "Application object of application created event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) { - String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - // check if an Application with same name exists in topology - if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) { - log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology"); - - } else { - // add application and the clusters to Topology - for(Cluster cluster: event.getClusterList()) { - topology.getService(cluster.getServiceName()).addCluster(cluster); - } - topology.addApplication(event.getApplication()); - } - - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java deleted file mode 100644 index 8c88324..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.ApplicationStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor responsible to process the application Inactivation even and update the Topology. - */ -public class ApplicationInactivatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationInactivatedMessageProcessor.class); - - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (ApplicationInactivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. - jsonToObject(message, ApplicationInactivatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } else { - // Apply changes to the topology - if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) { - log.error("Invalid State transfer from [ " + application.getStatus() + - " ] to [ " + ApplicationStatus.Inactive + " ]"); - } - application.setStatus(ApplicationStatus.Inactive); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java deleted file mode 100644 index 2dd3ea7..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -import java.util.Set; - -/** - * This processor responsible to process the application Inactivation even and update the Topology. - */ -public class ApplicationTerminatedMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationTerminatedMessageProcessor.class); - - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (ApplicationTerminatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util. - jsonToObject(message, ApplicationTerminatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplications(); - Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType()); - } - } - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplications(); - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); - } - } - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) { - - // check if required properties are available - if (event.getAppId() == null) { - String errorMsg = "Application Id of application removed event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - if (event.getTenantDomain()== null) { - String errorMsg = "Application tenant domain of application removed event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - // check if an Application with same name exists in topology - String appId = event.getAppId(); - if (topology.applicationExists(appId)) { - log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it"); - topology.removeApplication(appId); - } - - if (event.getClusterData() != null) { - // remove the Clusters from the Topology - for (ClusterDataHolder clusterData : event.getClusterData()) { - Service service = topology.getService(clusterData.getServiceType()); - if (service != null) { - service.removeCluster(clusterData.getClusterId()); - if (log.isDebugEnabled()) { - log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology"); - } - } else { - log.warn("Service " + clusterData.getServiceType() + " not found in Topology!"); - } - } - } - - if (log.isDebugEnabled()) { - log.debug("ApplicationRemovedMessageProcessor notifying listener "); - } - - notifyEventListeners(event); - return true; - - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java deleted file mode 100644 index 032be79..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.ApplicationStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor responsible to process the application Inactivation even and update the Topology. - */ -public class ApplicationTerminatingMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); - - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (ApplicationTerminatingEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util. - jsonToObject(message, ApplicationTerminatingEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } else { - // Apply changes to the topology - if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { - log.error("Invalid State transfer from [ " + application.getStatus() + - " ] to [ " + ApplicationStatus.Terminating + " ]"); - } - application.setStatus(ApplicationStatus.Terminating); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java deleted file mode 100644 index f368dd1..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.ApplicationStatus; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -import java.util.Set; - -public class ApplicationUndeployedMessageProcessor extends MessageProcessor { - - private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class); - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - - Topology topology = (Topology) object; - - if (ApplicationUndeployedEvent.class.getName().equals(type)) { - if (!topology.isInitialized()) { - return false; - } - - ApplicationUndeployedEvent event = (ApplicationUndeployedEvent) - Util.jsonToObject(message, ApplicationUndeployedEvent.class); - if (event == null) { - log.error("Unable to convert the JSON message to ApplicationUndeployedEvent"); - return false; - } - - // get write lock for the application and relevant Clusters - TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId()); - Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(), - clusterData.getClusterId()); - } - } - - try { - return doProcess(event, topology); - - } finally { - // remove locks - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(), - clusterData.getClusterId()); - } - } - TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format - ("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) { - - // update the application status to Terminating - Application application = topology.getApplication(event.getApplicationId()); - // check and update application status to 'Terminating' - if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { - log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating); - } - // for now anyway update the status forcefully - application.setStatus(ApplicationStatus.Terminating); - - // update all the Clusters' statuses to 'Terminating' - Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); - // update the Cluster statuses to Terminating - for (ClusterDataHolder clusterDataHolder : clusterData) { - Service service = topology.getService(clusterDataHolder.getServiceType()); - if (service != null) { - Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId()); - if (aCluster != null) { - // validate state transition - if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) { - log.error("Invalid state transfer from " + aCluster.getStatus() + " to " - + ClusterStatus.Terminating); - } - // for now anyway update the status forcefully - aCluster.setStatus(ClusterStatus.Terminating); - - } else { - log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() + - " in Topology"); - } - - } else { - log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " + - " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found"); - } - } - - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java deleted file mode 100644 index 09b9062..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Group; -import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor will act upon the Group activation events - */ -public class GroupActivatedProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (GroupActivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - GroupActivatedEvent event = (GroupActivatedEvent) Util. - jsonToObject(message, GroupActivatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (GroupActivatedEvent event,Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroupRecursively(event.getGroupId()); - - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - // Apply changes to the topology - if (!group.isStateTransitionValid(GroupStatus.Active)) { - log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); - } - group.setStatus(GroupStatus.Active); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java deleted file mode 100644 index d053f5d..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Group; -import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.GroupCreatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor will act upon the Group activation events - */ -public class GroupCreatedProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (GroupCreatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - GroupCreatedEvent event = (GroupCreatedEvent) Util. - jsonToObject(message, GroupCreatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (GroupCreatedEvent event,Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroupRecursively(event.getGroupId()); - - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - // Apply changes to the topology - if (!group.isStateTransitionValid(GroupStatus.Created)) { - log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " + - "for Group " + group.getAlias()); - } - group.setStatus(GroupStatus.Created); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java deleted file mode 100644 index add40b4..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Group; -import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.GroupInactivateEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor will act upon the Group activation events - */ -public class GroupInActivateProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (GroupInactivateEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - GroupInactivateEvent event = (GroupInactivateEvent) Util. - jsonToObject(message, GroupInactivateEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess(GroupInactivateEvent event, Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroupRecursively(event.getGroupId()); - - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - group.setStatus(GroupStatus.Inactive); - if (log.isInfoEnabled()) { - log.info(String.format("Group updated as in-activated : %s", - group.getUniqueIdentifier())); - } - } - - // Notify event listeners - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java deleted file mode 100644 index 767ff71..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Group; -import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor will act upon the Group activation events - */ -public class GroupTerminatedProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (GroupTerminatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - GroupTerminatedEvent event = (GroupTerminatedEvent) Util. - jsonToObject(message, GroupTerminatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (GroupTerminatedEvent event,Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroupRecursively(event.getGroupId()); - - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - // Apply changes to the topology - if (!group.isStateTransitionValid(GroupStatus.Terminated)) { - log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated); - } - group.setStatus(GroupStatus.Terminated); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java deleted file mode 100644 index 5b15532..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Group; -import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor will act upon the Group activation events - */ -public class GroupTerminatingProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (GroupTerminatingEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - GroupTerminatingEvent event = (GroupTerminatingEvent) Util. - jsonToObject(message, GroupTerminatingEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (GroupTerminatingEvent event,Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroupRecursively(event.getGroupId()); - - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - // Apply changes to the topology - if (!group.isStateTransitionValid(GroupStatus.Terminating)) { - log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); - } - group.setStatus(GroupStatus.Terminating); - - } - - // Notify event listeners - notifyEventListeners(event); - return true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index d8d25d9..048e9a3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -22,7 +22,6 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.listener.EventListener; -import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.processor.MessageProcessorChain; @@ -47,17 +46,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor; private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; - private GroupCreatedProcessor groupCreatedProcessor; - private GroupActivatedProcessor groupActivatedProcessor; - private GroupInActivateProcessor groupInActivateProcessor; - private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; - private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor; - private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor; - private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor; - private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor; - private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor; - private GroupTerminatingProcessor groupTerminatingProcessor; - private GroupTerminatedProcessor groupTerminatedProcessor; private ClusterTerminatingProcessor clusterTerminatingProcessor; private ClusterTerminatedProcessor clusterTerminatedProcessor; @@ -111,39 +99,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor(); add(memberTerminatedMessageProcessor); - groupCreatedProcessor = new GroupCreatedProcessor(); - add(groupCreatedProcessor); - - groupActivatedProcessor = new GroupActivatedProcessor(); - add(groupActivatedProcessor); - - groupInActivateProcessor = new GroupInActivateProcessor(); - add(groupInActivateProcessor); - - groupTerminatingProcessor = new GroupTerminatingProcessor(); - add(groupTerminatingProcessor); - - groupTerminatedProcessor = new GroupTerminatedProcessor(); - add(groupTerminatedProcessor); - - applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor(); - add(applicationCreatedMessageProcessor); - - applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor(); - add(applicationUndeployedMessageProcessor); - - applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor(); - add(applicationActivatedMessageProcessor); - - applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor(); - add(applicationInactivatedMessageProcessor); - - applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor(); - add(applicationTerminatedMessageProcessor); - - applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor(); - add(applicationTerminatingMessageProcessor); - if (log.isDebugEnabled()) { log.debug("Topology message processor chain initialized X1"); } @@ -182,30 +137,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { serviceRemovedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof MemberMaintenanceListener) { memberMaintenanceModeProcessor.addEventListener(eventListener); - } else if (eventListener instanceof GroupActivatedEventListener) { - groupActivatedProcessor.addEventListener(eventListener); - } else if (eventListener instanceof GroupCreatedEventListener) { - groupCreatedProcessor.addEventListener(eventListener); - } else if (eventListener instanceof GroupInActivateEventListener) { - groupInActivateProcessor.addEventListener(eventListener); - } else if (eventListener instanceof GroupTerminatedEventListener){ - groupTerminatedProcessor.addEventListener(eventListener); - } else if (eventListener instanceof GroupTerminatingEventListener){ - groupTerminatingProcessor.addEventListener(eventListener); - } else if (eventListener instanceof ApplicationCreatedEventListener) { - applicationCreatedMessageProcessor.addEventListener(eventListener); - } else if (eventListener instanceof ApplicationUndeployedEventListener) { - applicationUndeployedMessageProcessor.addEventListener(eventListener); - } else if (eventListener instanceof ApplicationActivatedEventListener) { - applicationActivatedMessageProcessor.addEventListener(eventListener); - } else if (eventListener instanceof ApplicationInActivateEventListener){ - applicationInactivatedMessageProcessor.addEventListener(eventListener); - } else if(eventListener instanceof ApplicationTerminatedEventListener){ - applicationTerminatedMessageProcessor.addEventListener(eventListener); - } else if(eventListener instanceof ApplicationTerminatingEventListener){ - applicationTerminatingMessageProcessor.addEventListener(eventListener); - } - else { + } else { throw new RuntimeException("Unknown event listener"); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java deleted file mode 100644 index 9eda9e0..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.receiver.application.status; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.listener.EventListener; -import org.apache.stratos.messaging.message.processor.MessageProcessorChain; -import org.apache.stratos.messaging.message.processor.application.status.AppStatusMessageProcessorChain; -import org.apache.stratos.messaging.util.Constants; - -import javax.jms.TextMessage; - -public class ApplicationStatusEventMessageDelegator implements Runnable { - private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageDelegator.class); - - private ApplicationStatusEventMessageQueue messageQueue; - private MessageProcessorChain processorChain; - private boolean terminated; - - public ApplicationStatusEventMessageDelegator(ApplicationStatusEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - this.processorChain = new AppStatusMessageProcessorChain(); - } - - public void addEventListener(EventListener eventListener) { - processorChain.addEventListener(eventListener); - } - - @Override - public void run() { - try { - if (log.isInfoEnabled()) { - log.info("Application status event message delegator started"); - } - - while (!terminated) { - try { - TextMessage message = messageQueue.take(); - - // Retrieve the header - String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); - - // Retrieve the actual message - String json = message.getText(); - - if (log.isDebugEnabled()) { - log.debug(String.format("Application status event message received from queue: %s", type)); - } - - // Delegate message to message processor chain - if (log.isDebugEnabled()) { - log.debug(String.format("Delegating application status event message: %s", type)); - } - processorChain.process(type, json, null); - } catch (Exception e) { - log.error("Failed to retrieve application status event message", e); - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Application status event message delegator failed", e); - } - } - } - - /** - * Terminate topology event message delegator thread. - */ - public void terminate() { - terminated = true; - } - - - private EventMessage jsonToEventMessage(String json) { - - EventMessage event = new EventMessage(); - String message; - - //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains - //message - String[] MessageParts = json.split(":", 3); - - String eventType = MessageParts[0].trim(); - eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\"")); - if(log.isDebugEnabled()){ - log.debug(String.format("Extracted [event type] %s", eventType)); - } - - event.setEventName(eventType); - String messageTag = MessageParts[1]; - messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\"")); - - if("message".equals(messageTag)){ - message = MessageParts[2].trim(); - //Remove trailing bracket twice to get the message - message = message.substring(0, message.lastIndexOf("}")).trim(); - message = message.substring(0, message.lastIndexOf("}")).trim(); - if(message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1){ - if(log.isDebugEnabled()) { - log.debug(String.format("[Extracted message] %s ", message)); - } - event.setMessage(message); - return event; - } - } - return null; - } - - private class EventMessage { - private String eventName; - private String message; - - private String getEventName() { - return eventName; - } - - private void setEventName(String eventName) { - this.eventName = eventName; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - } -}
