adding created events for cluster and group in topology and in app statuc topics


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/2414bca4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/2414bca4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/2414bca4

Branch: refs/heads/4.0.0-grouping
Commit: 2414bca4bf4ffcd373f010156c7d683afd11567d
Parents: 58f42ce
Author: reka <[email protected]>
Authored: Thu Oct 30 17:57:09 2014 +0530
Committer: reka <[email protected]>
Committed: Thu Oct 30 18:01:38 2014 +0530

----------------------------------------------------------------------
 .../status/AppStatusClusterCreatedEvent.java    |  50 +++++++++
 .../status/AppStatusGroupCreatedEvent.java      |  44 ++++++++
 .../event/topology/GroupCreatedEvent.java       |  43 ++++++++
 .../AppStatusClusterCreatedEventListener.java   |  24 ++++
 .../AppStatusGroupCreatedEventListener.java     |  24 ++++
 .../topology/GroupCreatedEventListener.java     |  27 +++++
 ...AppStatusClusterCreatedMessageProcessor.java |  58 ++++++++++
 .../AppStatusGroupCreatedMessageProcessor.java  |  62 +++++++++++
 .../status/AppStatusMessageProcessorChain.java  |  15 ++-
 .../topology/GroupCreatedProcessor.java         | 109 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |   6 +
 11 files changed, 461 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
new file mode 100644
index 0000000..6480af2
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class AppStatusClusterCreatedEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private final String serviceName;
+    private final String clusterId;
+    private String appId;
+
+    public AppStatusClusterCreatedEvent(String appId, String serviceName, 
String clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.appId = appId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
new file mode 100644
index 0000000..04ee30e
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class AppStatusGroupCreatedEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private String groupId;
+    private String appId;
+
+    public AppStatusGroupCreatedEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getGroupId() {
+        return this.groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
new file mode 100644
index 0000000..e3794f0
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupCreatedEvent extends Event {
+    private String appId;
+    private String groupId;
+
+    public GroupCreatedEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
new file mode 100644
index 0000000..c0c62f9
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class AppStatusClusterCreatedEventListener extends 
EventListener{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
new file mode 100644
index 0000000..82386a3
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class AppStatusGroupCreatedEventListener extends EventListener 
{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
new file mode 100644
index 0000000..3fb2d11
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered by the groups activation processor after processing 
the event
+ */
+public abstract class GroupCreatedEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..a743c43
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.messaging.event.application.status.AppStatusClusterActivatedEvent;
+import 
org.apache.stratos.messaging.event.application.status.AppStatusClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class AppStatusClusterCreatedMessageProcessor extends MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(AppStatusClusterCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (AppStatusClusterCreatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            AppStatusClusterCreatedEvent event = 
(AppStatusClusterCreatedEvent) Util.jsonToObject(message, 
AppStatusClusterCreatedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterCreatedEvent: " + 
event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
cluster created message using available message processors: [type] %s [body] 
%s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
new file mode 100644
index 0000000..b9a1c6d
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.messaging.event.application.status.AppStatusGroupActivatedEvent;
+import 
org.apache.stratos.messaging.event.application.status.AppStatusGroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class AppStatusGroupCreatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(AppStatusGroupCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (AppStatusGroupCreatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            AppStatusGroupCreatedEvent event =
+                    (AppStatusGroupCreatedEvent) Util.jsonToObject(message, 
AppStatusGroupCreatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received AppStatusGroupCreatedEvent: " + 
event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group created message 
" +
+                                "using available message processors: [type] %s 
[body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
index 14b8bc2..34cd02b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
@@ -31,10 +31,12 @@ import 
org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 public class AppStatusMessageProcessorChain extends MessageProcessorChain {
     private static final Log log = 
LogFactory.getLog(AppStatusMessageProcessorChain.class);
 
+    private AppStatusClusterCreatedMessageProcessor 
clusterCreatedMessageProcessor;
     private AppStatusClusterActivatedMessageProcessor 
clusterActivatedMessageProcessor;
     private AppStatusClusterInactivateMessageProcessor 
clusterInActivateMessageProcessor;
     private AppStatusClusterTerminatingMessageProcessor 
clusterTerminatingMessageProcessor;
     private AppStatusClusterTerminatedMessageProcessor 
clusterTerminatedMessageProcessor;
+    private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor;
     private AppStatusGroupActivatedMessageProcessor 
groupActivatedMessageProcessor;
     private AppStatusGroupInactivatedMessageProcessor 
groupInActivateMessageProcessor;
     private AppStatusApplicationActivatedMessageProcessor 
appActivatedMessageProcessor;
@@ -48,6 +50,9 @@ public class AppStatusMessageProcessorChain extends 
MessageProcessorChain {
 
     public void initialize() {
         // Add instance notifier event processors
+        clusterCreatedMessageProcessor= new 
AppStatusClusterCreatedMessageProcessor();
+        add(clusterCreatedMessageProcessor);
+
         clusterActivatedMessageProcessor = new 
AppStatusClusterActivatedMessageProcessor();
         add(clusterActivatedMessageProcessor);
 
@@ -56,9 +61,13 @@ public class AppStatusMessageProcessorChain extends 
MessageProcessorChain {
 
         clusterTerminatingMessageProcessor = new 
AppStatusClusterTerminatingMessageProcessor();
         add(clusterTerminatingMessageProcessor);
+
         clusterTerminatedMessageProcessor = new 
AppStatusClusterTerminatedMessageProcessor();
         add(clusterTerminatedMessageProcessor);
 
+        groupCreatedMessageProcessor = new 
AppStatusGroupCreatedMessageProcessor();
+        add(groupCreatedMessageProcessor);
+
         groupActivatedMessageProcessor = new 
AppStatusGroupActivatedMessageProcessor();
         add(groupActivatedMessageProcessor);
 
@@ -92,10 +101,14 @@ public class AppStatusMessageProcessorChain extends 
MessageProcessorChain {
     }
 
     public void addEventListener(EventListener eventListener) {
-        if (eventListener instanceof AppStatusClusterActivatedEventListener) {
+        if(eventListener instanceof AppStatusClusterCreatedEventListener) {
+            clusterCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof 
AppStatusClusterActivatedEventListener) {
             clusterActivatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof 
AppStatusClusterInactivateEventListener) {
             clusterInActivateMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof AppStatusGroupCreatedEventListener) 
{
+            groupCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof 
AppStatusGroupActivatedEventListener) {
             groupActivatedMessageProcessor.addEventListener(eventListener);
         } else if(eventListener instanceof 
AppStatusClusterTerminatedEventListener){

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
new file mode 100644
index 0000000..4a8a744
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.Group;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import 
org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupCreatedProcessor extends MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(GroupCreatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupCreatedEvent event = (GroupCreatedEvent) Util.
+                    jsonToObject(message, GroupCreatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupCreatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] 
%s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] 
%s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Created)) {
+                log.error("Invalid State Transition from " + group.getStatus() 
+ " to " + GroupStatus.Active);
+            }
+            group.setStatus(GroupStatus.Created);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 4f6d3a9..1ed5576 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -46,6 +46,7 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
     private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor;
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
+    private GroupCreatedProcessor groupCreatedProcessor;
     private GroupActivatedProcessor groupActivatedProcessor;
     private GroupInActivateProcessor groupInActivateProcessor;
     private ApplicationCreatedMessageProcessor 
applicationCreatedMessageProcessor;
@@ -109,6 +110,9 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
         memberTerminatedMessageProcessor = new 
MemberTerminatedMessageProcessor();
         add(memberTerminatedMessageProcessor);
 
+        groupCreatedProcessor = new GroupCreatedProcessor();
+        add(groupCreatedProcessor);
+
         groupActivatedProcessor = new GroupActivatedProcessor();
         add(groupActivatedProcessor);
 
@@ -179,6 +183,8 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
             memberMaintenanceModeProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof GroupActivatedEventListener) {
             groupActivatedProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupCreatedEventListener) {
+            groupCreatedProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof GroupInActivateEventListener) {
             groupInActivateProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof GroupTerminatedEventListener){

Reply via email to