Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 4eb3e860c -> 069b61a84
correcting wrong states in the Application* processors Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/069b61a8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/069b61a8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/069b61a8 Branch: refs/heads/4.0.0-grouping Commit: 069b61a843e6806bef97c7e74718b8551dd5fe89 Parents: 4eb3e86 Author: Isuru Haththotuwa <[email protected]> Authored: Wed Oct 22 14:12:33 2014 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Wed Oct 22 14:12:40 2014 +0530 ---------------------------------------------------------------------- .../ApplicationActivatedMessageProcessor.java | 4 + .../ApplicationInactivatedMessageProcessor.java | 6 +- .../ApplicationTerminatedMessageProcessor.java | 6 +- .../ApplicationTerminatingMessageProcessor.java | 107 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 1 - .../ApplicationTerminatingMessageProcessor.java | 102 ------------------ 6 files changed, 121 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java index 8626fca..ed360be 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java @@ -88,6 +88,10 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor { return false; } else { // Apply changes to the topology + if (!application.isStateTransitionValid(ApplicationStatus.Active)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Active + " ]"); + } application.setStatus(ApplicationStatus.Active); if (log.isInfoEnabled()) { log.info(String.format("Application updated as activated : %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java index 986e04e..a2bdb60 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java @@ -88,7 +88,11 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor { return false; } else { // Apply changes to the topology - application.setStatus(ApplicationStatus.Terminated); + if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Inactive + " ]"); + } + application.setStatus(ApplicationStatus.Inactive); if (log.isInfoEnabled()) { log.info(String.format("Application updated as inactivated : %s", application.toString())); http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java index 3c9d753..b08e6f9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java @@ -88,7 +88,11 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { return false; } else { // Apply changes to the topology - application.setStatus(ApplicationStatus.Terminating); + if (!application.isStateTransitionValid(ApplicationStatus.Terminated)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Terminated + " ]"); + } + application.setStatus(ApplicationStatus.Terminated); if (log.isInfoEnabled()) { log.info(String.format("Application updated as terminating : %s", application.toString())); http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java new file mode 100644 index 0000000..2c1ed00 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor responsible to process the application Inactivation even and update the Topology. + */ +public class ApplicationTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. + jsonToObject(message, ApplicationInactivatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } else { + // Apply changes to the topology + if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { + log.error("Invalid State transfer from [ " + application.getStatus() + + " ] to [ " + ApplicationStatus.Terminating + " ]"); + } + application.setStatus(ApplicationStatus.Terminating); + if (log.isInfoEnabled()) { + log.info(String.format("Application updated as activated : %s", + application.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index 206b7db..8a54916 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -27,7 +27,6 @@ import org.apache.stratos.messaging.listener.application.status.ApplicationTermi import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatingEventListener; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.processor.MessageProcessorChain; -import org.apache.stratos.messaging.message.processor.topology.updater.ApplicationTerminatingMessageProcessor; /** * Defines default topology message processor chain. http://git-wip-us.apache.org/repos/asf/stratos/blob/069b61a8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java deleted file mode 100644 index fba6b53..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.topology.updater; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.topology.Application; -import org.apache.stratos.messaging.domain.topology.ApplicationStatus; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - -/** - * This processor responsible to process the application Inactivation even and update the Topology. - */ -public class ApplicationTerminatingMessageProcessor extends MessageProcessor { - private static final Log log = - LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); - - - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - - @Override - public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; - - if (ApplicationInactivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) - return false; - - // Parse complete message and build event - ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. - jsonToObject(message, ApplicationInactivatedEvent.class); - - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); - - try { - return doProcess(event, topology); - - } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); - } - - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); - } - } - } - - private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { - - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } else { - // Apply changes to the topology - application.setStatus(ApplicationStatus.Inactive); - if (log.isInfoEnabled()) { - log.info(String.format("Application updated as activated : %s", - application.toString())); - } - } - - // Notify event listeners - notifyEventListeners(event); - return true; - - } -}
