Repository: incubator-stratos
Updated Branches:
  refs/heads/master 9e3cf62ca -> 8637670b9


adding Maintenance processor for the Topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8637670b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8637670b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8637670b

Branch: refs/heads/master
Commit: 8637670b9e9720ca8a027959b992f960bdd8265e
Parents: 9e3cf62
Author: rekathiru <[email protected]>
Authored: Wed Feb 19 16:08:31 2014 +0530
Committer: rekathiru <[email protected]>
Committed: Wed Feb 19 16:08:31 2014 +0530

----------------------------------------------------------------------
 .../messaging/domain/topology/MemberStatus.java |   2 +-
 .../topology/MemberMaintenanceListener.java     |  24 +++
 .../MemberMaintenanceModeProcessor.java         | 147 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |   4 +
 4 files changed, 176 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java
index d6d14a8..e3008a3 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/MemberStatus.java
@@ -26,6 +26,6 @@ import javax.xml.bind.annotation.XmlRootElement;
  */
 @XmlRootElement
 public enum MemberStatus {
-    Created, Starting, Activated, Suspended, ReadyToShutDown, ShuttingDown, 
Terminated, Maintenance
+    Created, Starting, Activated, Suspended, ReadyToShutDown, ShuttingDown, 
Terminated, In_Maintenance
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java
new file mode 100644
index 0000000..6340cf0
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/MemberMaintenanceListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class MemberMaintenanceListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
new file mode 100644
index 0000000..bed6430
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class MemberMaintenanceModeProcessor extends MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(MemberMaintenanceModeProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (MemberMaintenanceModeEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) 
Util.
+                                            jsonToObject(message, 
MemberMaintenanceModeEvent.class);
+
+            // Apply service filter
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                    // Service is excluded, do not update topology or fire 
event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return false;
+                }
+            }
+
+            // Apply cluster filter
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire 
event
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            // Validate event against the existing topology
+            Service service = topology.getService(event.getServiceName());
+            if (service == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Service does not exist: [service] 
%s",
+                            event.getServiceName()));
+                }
+                return false;
+            }
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (cluster == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
+                            event.getServiceName(), event.getClusterId()));
+                }
+                return false;
+            }
+            Member member = cluster.getMember(event.getMemberId());
+            if (member == null) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member does not exist: [service] 
%s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
+            if (member.getStatus() == MemberStatus.In_Maintenance) {
+                if (log.isWarnEnabled()) {
+                    log.warn(String.format("Member already updated as 
In_Maintenance: " +
+                            "[service] %s [cluster] %s [member] %s",
+                            event.getServiceName(),
+                            event.getClusterId(),
+                            event.getMemberId()));
+                }
+                return false;
+            }
+
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.In_Maintenance);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as In_Maintenance: 
[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8637670b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 83dde30..7ba2e1d 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -40,6 +40,7 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
     private MemberStartedMessageProcessor memberStartedMessageProcessor;
     private MemberActivatedMessageProcessor memberActivatedMessageProcessor;
     private MemberReadyToShutdownMessageProcessor 
memberReadyToShutdownProcessor;
+    private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor;
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
 
@@ -72,6 +73,9 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
         memberReadyToShutdownProcessor = new 
MemberReadyToShutdownMessageProcessor();
         add(memberReadyToShutdownProcessor);
 
+        memberMaintenanceModeProcessor = new MemberMaintenanceModeProcessor();
+        add(memberMaintenanceModeProcessor);
+
         memberSuspendedMessageProcessor = new 
MemberSuspendedMessageProcessor();
         add(memberSuspendedMessageProcessor);
 

Reply via email to