Updated Branches: refs/heads/master 5102bd8bf -> 7fab5047c
introducing new state to member as readytoshutdown - STRATOS-330 Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7fab5047 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7fab5047 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7fab5047 Branch: refs/heads/master Commit: 7fab5047c59b390a2ea5828916f87d2a0f6ce1d4 Parents: 5102bd8 Author: rekathiru <[email protected]> Authored: Fri Jan 3 13:28:10 2014 +0530 Committer: rekathiru <[email protected]> Committed: Fri Jan 3 13:28:10 2014 +0530 ---------------------------------------------------------------------- .../topology/MemberReadyToShutdownEvent.java | 61 ++++++++ .../MemberReadyToShutdownEventListener.java | 25 ++++ .../MemberReadyToShutdownMessageProcessor.java | 146 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 6 + 4 files changed, 238 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java new file mode 100644 index 0000000..7c93b3e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberReadyToShutdownEvent.java @@ -0,0 +1,61 @@ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.domain.topology.MemberStatus; + +import java.io.Serializable; +import java.util.Properties; + +public class MemberReadyToShutdownEvent extends TopologyEvent implements Serializable { + private final String serviceName; + private final String clusterId; + private final String networkPartitionId; + private final String partitionId; + private final String memberId; + private MemberStatus status; + private Properties properties; + + public MemberReadyToShutdownEvent(String serviceName, String clusterId, + String networkPartitionId, String partitionId, String memberId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.networkPartitionId = networkPartitionId; + this.partitionId = partitionId; + this.memberId = memberId; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public String getMemberId() { + return memberId; + } + + public MemberStatus getStatus() { + return status; + } + + public void setStatus(MemberStatus status) { + this.status = status; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public String getPartitionId() { + return partitionId; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java new file mode 100644 index 0000000..6eeea37 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberReadyToShutdownEventListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class MemberReadyToShutdownEventListener extends EventListener { + +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java new file mode 100644 index 0000000..b2e43b8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{ + private static final Log log = LogFactory.getLog(MemberReadyToShutdownMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (MemberReadyToShutdownEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util. + jsonToObject(message, MemberReadyToShutdownEvent.class); + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + } + return false; + } + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } + + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); + } + return false; + } + } + + if (member.getStatus() == MemberStatus.ReadyToShutDown) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already updated as Ready to Shutdown: " + + "[service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } + + // Apply changes to the topology + member.setStatus(MemberStatus.ReadyToShutDown); + + if (log.isInfoEnabled()) { + log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7fab5047/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index 1281761..83dde30 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -39,6 +39,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private InstanceSpawnedMessageProcessor instanceSpawnedMessageProcessor; private MemberStartedMessageProcessor memberStartedMessageProcessor; private MemberActivatedMessageProcessor memberActivatedMessageProcessor; + private MemberReadyToShutdownMessageProcessor memberReadyToShutdownProcessor; private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; @@ -68,6 +69,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberActivatedMessageProcessor = new MemberActivatedMessageProcessor(); add(memberActivatedMessageProcessor); + memberReadyToShutdownProcessor = new MemberReadyToShutdownMessageProcessor(); + add(memberReadyToShutdownProcessor); + memberSuspendedMessageProcessor = new MemberSuspendedMessageProcessor(); add(memberSuspendedMessageProcessor); @@ -92,6 +96,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberActivatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof MemberStartedEventListener) { memberStartedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof MemberReadyToShutdownEventListener) { + memberReadyToShutdownProcessor.addEventListener(eventListener); } else if (eventListener instanceof MemberSuspendedEventListener) { memberSuspendedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof MemberTerminatedEventListener) {
