adding message processors for topology
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1763feac Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1763feac Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1763feac Branch: refs/heads/4.0.0-grouping Commit: 1763feac9dbfd7877f288af423661417c5c3fca9 Parents: ad31e2a Author: Udara Liyanage <[email protected]> Authored: Wed Oct 22 13:50:19 2014 +0530 Committer: Udara Liyanage <[email protected]> Committed: Wed Oct 22 13:50:19 2014 +0530 ---------------------------------------------------------------------- .../ApplicationInactivatedMessageProcessor.java | 103 +++++++++++++++++++ .../ApplicationTerminatedMessageProcessor.java | 103 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 25 ++++- .../ApplicationTerminatingMessageProcessor.java | 102 ++++++++++++++++++ 4 files changed, 332 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java new file mode 100644 index 0000000..986e04e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ +public class ApplicationInactivatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationInactivatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. + jsonToObject(message, ApplicationInactivatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } else { + // Apply changes to the topology + application.setStatus(ApplicationStatus.Terminated); + if (log.isInfoEnabled()) { + log.info(String.format("Application updated as inactivated : %s", + application.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java new file mode 100644 index 0000000..3c9d753 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ +public class ApplicationTerminatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationTerminatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. + jsonToObject(message, ApplicationInactivatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } else { + // Apply changes to the topology + application.setStatus(ApplicationStatus.Terminating); + if (log.isInfoEnabled()) { + log.info(String.format("Application updated as terminating : %s", + application.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index db9e8b1..b0cefbb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -22,8 +22,12 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.application.status.ApplicationInActivatedEventListener; +import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatedEventListener; +import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatingEventListener; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.topology.updater.ApplicationTerminatingMessageProcessor; /** * Defines default topology message processor chain. @@ -51,6 +55,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor; private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor; + private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor; + private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor; + private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor; public void initialize() { // Add topology event processors @@ -108,6 +115,15 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor(); add(applicationActivatedMessageProcessor); + applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor(); + add(applicationInactivatedMessageProcessor); + + applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor(); + add(applicationTerminatedMessageProcessor); + + applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor(); + add(applicationTerminatingMessageProcessor); + if (log.isDebugEnabled()) { log.debug("Topology message processor chain initialized X1"); } @@ -150,7 +166,14 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { applicationRemovedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationActivatedEventListener) { applicationActivatedMessageProcessor.addEventListener(eventListener); - } else { + } else if (eventListener instanceof ApplicationInActivatedEventListener){ + applicationInactivatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof ApplicationTerminatedEventListener){ + applicationTerminatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof ApplicationTerminatingEventListener){ + applicationTerminatingMessageProcessor.addEventListener(eventListener); + } + else { throw new RuntimeException("Unknown event listener"); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java new file mode 100644 index 0000000..fba6b53 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology.updater; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ +public class ApplicationTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. + jsonToObject(message, ApplicationInactivatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } else { + // Apply changes to the topology + application.setStatus(ApplicationStatus.Inactive); + if (log.isInfoEnabled()) { + log.info(String.format("Application updated as activated : %s", + application.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } +}
