Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 3c2d3e722 -> 8efef751e


adding cluster in active processors to topolgy and application status topic


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8efef751
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8efef751
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8efef751

Branch: refs/heads/4.0.0-grouping
Commit: 8efef751e88bebe62a9bf00c03f9db9b55ee5032
Parents: 3c2d3e7
Author: reka <[email protected]>
Authored: Sat Oct 18 23:05:38 2014 +0530
Committer: reka <[email protected]>
Committed: Sat Oct 18 23:05:38 2014 +0530

----------------------------------------------------------------------
 .../status/ClusterInActivateEvent.java          |  50 +++++++
 .../status/GroupInActivateEvent.java            |  44 ++++++
 .../event/topology/ClusterInActivateEvent.java  |  56 ++++++++
 .../event/topology/GroupInActivateEvent.java    |  43 ++++++
 .../status/ClusterInActivateEventListener.java  |  24 ++++
 .../status/GroupInActivateEventListener.java    |  27 ++++
 .../ClusterInActivateEventListener.java         |  27 ++++
 .../topology/GroupInActivateEventListener.java  |  27 ++++
 ...StatusClusterInActivateMessageProcessor.java |  59 ++++++++
 ...onStatusGroupInActivateMessageProcessor.java |  62 +++++++++
 .../topology/ClusterInActivateProcessor.java    | 135 +++++++++++++++++++
 .../topology/GroupInActivateProcessor.java      | 109 +++++++++++++++
 12 files changed, 663 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java
new file mode 100644
index 0000000..e2a5887
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class ClusterInActivateEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private final String serviceName;
+    private final String clusterId;
+    private String appId;
+
+    public ClusterInActivateEvent(String appId, String serviceName, String 
clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.appId = appId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java
new file mode 100644
index 0000000..c7c8d58
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class GroupInActivateEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private String groupId;
+    private String appId;
+
+    public GroupInActivateEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getGroupId() {
+        return this.groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java
new file mode 100644
index 0000000..36ea436
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Cluster activated event will be sent by Autoscaler
+ */
+public class ClusterInActivateEvent extends Event {
+
+    private final String serviceName;
+    private final String clusterId;
+    private String appId;
+
+    public ClusterInActivateEvent(String appId, String serviceName, String 
clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.appId = appId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterActivatedEvent [serviceName=" + serviceName + ", 
clusterStatus=" +
+                "]";
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java
new file mode 100644
index 0000000..dd7007b
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupInActivateEvent extends Event {
+    private String appId;
+    private String groupId;
+
+    public GroupInActivateEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java
new file mode 100644
index 0000000..2ae7335
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class ClusterInActivateEventListener extends EventListener{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java
new file mode 100644
index 0000000..ecc4f04
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Created by reka on 9/22/14.
+ */
+public abstract class GroupInActivateEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java
new file mode 100644
index 0000000..03a4768
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Created by reka on 9/17/14.
+ */
+public abstract class ClusterInActivateEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java
new file mode 100644
index 0000000..93a5a60
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered by the groups activation processor after processing 
the event
+ */
+public abstract class GroupInActivateEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java
new file mode 100644
index 0000000..4e2fe7c
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.messaging.event.application.status.ClusterActivatedEvent;
+import 
org.apache.stratos.messaging.event.application.status.ClusterInActivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ApplicationStatusClusterInActivateMessageProcessor extends 
MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(ApplicationStatusClusterInActivateMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterInActivateEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterInActivateEvent event = (ClusterInActivateEvent) Util.
+                                                jsonToObject(message, 
ClusterInActivateEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received ClusterInActivateEvent: " + 
event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
cluster activated message using available message processors: [type] %s [body] 
%s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java
new file mode 100644
index 0000000..099b185
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.messaging.event.application.status.GroupActivatedEvent;
+import 
org.apache.stratos.messaging.event.application.status.GroupInActivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationStatusGroupInActivateMessageProcessor extends 
MessageProcessor {
+    private static final Log log =
+            
LogFactory.getLog(ApplicationStatusGroupInActivateMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupInActivateEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupInActivateEvent event =
+                    (GroupInActivateEvent) Util.jsonToObject(message, 
GroupInActivateEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupInActivateEvent: " + 
event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in activated 
message " +
+                                "using available message processors: [type] %s 
[body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java
new file mode 100644
index 0000000..8156055
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Status;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterInActivateEvent;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import 
org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the cluster activated event
+ */
+public class ClusterInActivateProcessor extends MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(ClusterInActivateProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+
+        Topology topology = (Topology) object;
+
+        if (ClusterInActivateEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            ClusterInActivateEvent event = (ClusterInActivateEvent) Util.
+                    jsonToObject(message, ClusterInActivateEvent.class);
+
+            TopologyUpdater.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyUpdater.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            }
+
+        }  else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+
+    private boolean doProcess (ClusterInActivateEvent event,Topology topology) 
{
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
+                }
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
+        } else {
+            // Apply changes to the topology
+            //TODO
+            // cluster.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as activated : %s",
+                        cluster.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
new file mode 100644
index 0000000..36ca259
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.Group;
+import org.apache.stratos.messaging.domain.topology.Status;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupInActivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import 
org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInActivateProcessor extends MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(GroupInActivateProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupActivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInActivateEvent event = (GroupInActivateEvent) Util.
+                    jsonToObject(message, GroupInActivateEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupInActivateEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] 
%s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] 
%s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            //TODO
+            // group.setStatus(Status.Activated);
+            if (log.isInfoEnabled())     {
+                log.info(String.format("Group updated as activated : %s",
+                        group.getUniqueIdentifier()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

Reply via email to