adding created events for cluster and group in topology and in app statuc topics
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/2414bca4 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/2414bca4 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/2414bca4 Branch: refs/heads/4.0.0-grouping Commit: 2414bca4bf4ffcd373f010156c7d683afd11567d Parents: 58f42ce Author: reka <[email protected]> Authored: Thu Oct 30 17:57:09 2014 +0530 Committer: reka <[email protected]> Committed: Thu Oct 30 18:01:38 2014 +0530 ---------------------------------------------------------------------- .../status/AppStatusClusterCreatedEvent.java | 50 +++++++++ .../status/AppStatusGroupCreatedEvent.java | 44 ++++++++ .../event/topology/GroupCreatedEvent.java | 43 ++++++++ .../AppStatusClusterCreatedEventListener.java | 24 ++++ .../AppStatusGroupCreatedEventListener.java | 24 ++++ .../topology/GroupCreatedEventListener.java | 27 +++++ ...AppStatusClusterCreatedMessageProcessor.java | 58 ++++++++++ .../AppStatusGroupCreatedMessageProcessor.java | 62 +++++++++++ .../status/AppStatusMessageProcessorChain.java | 15 ++- .../topology/GroupCreatedProcessor.java | 109 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 6 + 11 files changed, 461 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java new file mode 100644 index 0000000..6480af2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class AppStatusClusterCreatedEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private final String serviceName; + private final String clusterId; + private String appId; + + public AppStatusClusterCreatedEvent(String appId, String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.appId = appId; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java new file mode 100644 index 0000000..04ee30e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class AppStatusGroupCreatedEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private String groupId; + private String appId; + + public AppStatusGroupCreatedEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getGroupId() { + return this.groupId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java new file mode 100644 index 0000000..e3794f0 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Group Activated Event which will be sent to Topology upon group activation + */ +public class GroupCreatedEvent extends Event { + private String appId; + private String groupId; + + public GroupCreatedEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getAppId() { + return appId; + } + + public String getGroupId() { + return groupId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java new file mode 100644 index 0000000..c0c62f9 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class AppStatusClusterCreatedEventListener extends EventListener{ +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java new file mode 100644 index 0000000..82386a3 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class AppStatusGroupCreatedEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java new file mode 100644 index 0000000..3fb2d11 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered by the groups activation processor after processing the event + */ +public abstract class GroupCreatedEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java new file mode 100644 index 0000000..a743c43 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.application.status.AppStatusClusterActivatedEvent; +import org.apache.stratos.messaging.event.application.status.AppStatusClusterCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class AppStatusClusterCreatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(AppStatusClusterCreatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AppStatusClusterCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + AppStatusClusterCreatedEvent event = (AppStatusClusterCreatedEvent) Util.jsonToObject(message, AppStatusClusterCreatedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received AppStatusClusterCreatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java new file mode 100644 index 0000000..b9a1c6d --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.application.status.AppStatusGroupActivatedEvent; +import org.apache.stratos.messaging.event.application.status.AppStatusGroupCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class AppStatusGroupCreatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(AppStatusGroupCreatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AppStatusGroupCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + AppStatusGroupCreatedEvent event = + (AppStatusGroupCreatedEvent) Util.jsonToObject(message, AppStatusGroupCreatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received AppStatusGroupCreatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group created message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java index 14b8bc2..34cd02b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java @@ -31,10 +31,12 @@ import org.apache.stratos.messaging.message.processor.MessageProcessorChain; public class AppStatusMessageProcessorChain extends MessageProcessorChain { private static final Log log = LogFactory.getLog(AppStatusMessageProcessorChain.class); + private AppStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor; private AppStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor; private AppStatusClusterInactivateMessageProcessor clusterInActivateMessageProcessor; private AppStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor; private AppStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor; + private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor; private AppStatusGroupActivatedMessageProcessor groupActivatedMessageProcessor; private AppStatusGroupInactivatedMessageProcessor groupInActivateMessageProcessor; private AppStatusApplicationActivatedMessageProcessor appActivatedMessageProcessor; @@ -48,6 +50,9 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain { public void initialize() { // Add instance notifier event processors + clusterCreatedMessageProcessor= new AppStatusClusterCreatedMessageProcessor(); + add(clusterCreatedMessageProcessor); + clusterActivatedMessageProcessor = new AppStatusClusterActivatedMessageProcessor(); add(clusterActivatedMessageProcessor); @@ -56,9 +61,13 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain { clusterTerminatingMessageProcessor = new AppStatusClusterTerminatingMessageProcessor(); add(clusterTerminatingMessageProcessor); + clusterTerminatedMessageProcessor = new AppStatusClusterTerminatedMessageProcessor(); add(clusterTerminatedMessageProcessor); + groupCreatedMessageProcessor = new AppStatusGroupCreatedMessageProcessor(); + add(groupCreatedMessageProcessor); + groupActivatedMessageProcessor = new AppStatusGroupActivatedMessageProcessor(); add(groupActivatedMessageProcessor); @@ -92,10 +101,14 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain { } public void addEventListener(EventListener eventListener) { - if (eventListener instanceof AppStatusClusterActivatedEventListener) { + if(eventListener instanceof AppStatusClusterCreatedEventListener) { + clusterCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusClusterActivatedEventListener) { clusterActivatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof AppStatusClusterInactivateEventListener) { clusterInActivateMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusGroupCreatedEventListener) { + groupCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof AppStatusGroupActivatedEventListener) { groupActivatedMessageProcessor.addEventListener(eventListener); } else if(eventListener instanceof AppStatusClusterTerminatedEventListener){ http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java new file mode 100644 index 0000000..4a8a744 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.domain.topology.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; +import org.apache.stratos.messaging.event.topology.GroupCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupCreatedProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupCreatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupCreatedEvent event = (GroupCreatedEvent) Util. + jsonToObject(message, GroupCreatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupCreatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Created)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); + } + group.setStatus(GroupStatus.Created); + + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index 4f6d3a9..1ed5576 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -46,6 +46,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor; private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; + private GroupCreatedProcessor groupCreatedProcessor; private GroupActivatedProcessor groupActivatedProcessor; private GroupInActivateProcessor groupInActivateProcessor; private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; @@ -109,6 +110,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor(); add(memberTerminatedMessageProcessor); + groupCreatedProcessor = new GroupCreatedProcessor(); + add(groupCreatedProcessor); + groupActivatedProcessor = new GroupActivatedProcessor(); add(groupActivatedProcessor); @@ -179,6 +183,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { memberMaintenanceModeProcessor.addEventListener(eventListener); } else if (eventListener instanceof GroupActivatedEventListener) { groupActivatedProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupCreatedEventListener) { + groupCreatedProcessor.addEventListener(eventListener); } else if (eventListener instanceof GroupInActivateEventListener) { groupInActivateProcessor.addEventListener(eventListener); } else if (eventListener instanceof GroupTerminatedEventListener){
