Repository: incubator-stratos Updated Branches: refs/heads/master 9e3cf62ca -> 8637670b9
adding Maintenance processor for the Topology Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8637670b Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8637670b Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8637670b Branch: refs/heads/master Commit: 8637670b9e9720ca8a027959b992f960bdd8265e Parents: 9e3cf62 Author: rekathiru <[email protected]> Authored: Wed Feb 19 16:08:31 2014 +0530 Committer: rekathiru <[email protected]> Committed: Wed Feb 19 16:08:31 2014 +0530 ---------------------------------------------------------------------- .../messaging/domain/topology/MemberStatus.java | 2 +- .../topology/MemberMaintenanceListener.java | 24 +++ .../MemberMaintenanceModeProcessor.java | 147 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 4 + 4 files changed, 176 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java index d6d14a8..e3008a3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java @@ -26,6 +26,6 @@ import javax.xml.bind.annotation.XmlRootElement; */ @XmlRootElement public enum MemberStatus { - Created, Starting, Activated, Suspended, ReadyToShutDown, ShuttingDown, Terminated, Maintenance + Created, Starting, Activated, Suspended, ReadyToShutDown, ShuttingDown, Terminated, In_Maintenance } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java new file mode 100644 index 0000000..6340cf0 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class MemberMaintenanceListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java new file mode 100644 index 0000000..bed6430 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class MemberMaintenanceModeProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(MemberMaintenanceModeProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (MemberMaintenanceModeEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util. + jsonToObject(message, MemberMaintenanceModeEvent.class); + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + } + return false; + } + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } + + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); + } + return false; + } + } + + if (member.getStatus() == MemberStatus.In_Maintenance) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already updated as In_Maintenance: " + + "[service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } + + // Apply changes to the topology + member.setStatus(MemberStatus.In_Maintenance); + + if (log.isInfoEnabled()) { + log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index 83dde30..7ba2e1d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -40,6 +40,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private MemberStartedMessageProcessor memberStartedMessageProcessor; private MemberActivatedMessageProcessor memberActivatedMessageProcessor; private MemberReadyToShutdownMessageProcessor memberReadyToShutdownProcessor; + private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor; private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; @@ -72,6 +73,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberReadyToShutdownProcessor = new MemberReadyToShutdownMessageProcessor(); add(memberReadyToShutdownProcessor); + memberMaintenanceModeProcessor = new MemberMaintenanceModeProcessor(); + add(memberMaintenanceModeProcessor); + memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor(); add(memberSuspendedMessageProcessor);
