http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java new file mode 100644 index 0000000..9520f4e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppClusterTerminatingMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.AppClusterTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class AppClusterTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(AppClusterTerminatingMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AppClusterTerminatingEvent.class.getName().equals(type)) { + // Parse complete message and build event + AppClusterTerminatingEvent event = (AppClusterTerminatingEvent) Util. + jsonToObject(message, AppClusterTerminatingEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java new file mode 100644 index 0000000..1524b00 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/AppStatusMessageProcessorChain.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.application.status.*; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; + +/** + * Application Status processor chain is to handle the list processors to parse the application + * status. + */ +public class AppStatusMessageProcessorChain extends MessageProcessorChain { + private static final Log log = LogFactory.getLog(AppStatusMessageProcessorChain.class); + + private AppStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor; + private AppStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor; + private AppStatusClusterInactivateMessageProcessor clusterInActivateMessageProcessor; + private AppStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor; + private AppStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor; + private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor; + private AppStatusGroupActivatedMessageProcessor groupActivatedMessageProcessor; + private AppStatusGroupInactivatedMessageProcessor groupInActivateMessageProcessor; + private AppStatusApplicationActivatedMessageProcessor appActivatedMessageProcessor; + private AppStatusApplicationCreatedMessageProcessor applicationStatusAppCreatedMessageProcessor; + private AppStatusApplicationInactivatedMessageProcessor applicationStatusAppInActivatedMessageProcessor; + private AppStatusApplicationTerminatedMessageProcessor applicationStatusAppTerminatedMessageProcessor; + private AppStatusApplicationTerminatingMessageProcessor applicationStatusAppTerminatingMessageProcessor; + + private AppStatusGroupTerminatedMessageProcessor groupTerminatedMessageProcessor; + private AppStatusGroupTerminatingMessageProcessor groupTerminatingMessageProcessor; + + public void initialize() { + // Add instance notifier event processors + clusterCreatedMessageProcessor= new AppStatusClusterCreatedMessageProcessor(); + add(clusterCreatedMessageProcessor); + + clusterActivatedMessageProcessor = new AppStatusClusterActivatedMessageProcessor(); + add(clusterActivatedMessageProcessor); + + clusterInActivateMessageProcessor = new AppStatusClusterInactivateMessageProcessor(); + add(clusterInActivateMessageProcessor); + + clusterTerminatingMessageProcessor = new AppStatusClusterTerminatingMessageProcessor(); + add(clusterTerminatingMessageProcessor); + + clusterTerminatedMessageProcessor = new AppStatusClusterTerminatedMessageProcessor(); + add(clusterTerminatedMessageProcessor); + + groupCreatedMessageProcessor = new AppStatusGroupCreatedMessageProcessor(); + add(groupCreatedMessageProcessor); + + groupActivatedMessageProcessor = new AppStatusGroupActivatedMessageProcessor(); + add(groupActivatedMessageProcessor); + + groupInActivateMessageProcessor = new AppStatusGroupInactivatedMessageProcessor(); + add(groupInActivateMessageProcessor); + + appActivatedMessageProcessor = new AppStatusApplicationActivatedMessageProcessor(); + add(appActivatedMessageProcessor); + + applicationStatusAppCreatedMessageProcessor = new AppStatusApplicationCreatedMessageProcessor(); + this.add(applicationStatusAppCreatedMessageProcessor); + + applicationStatusAppInActivatedMessageProcessor = new AppStatusApplicationInactivatedMessageProcessor(); + this.add(applicationStatusAppInActivatedMessageProcessor); + + applicationStatusAppTerminatedMessageProcessor = new AppStatusApplicationTerminatedMessageProcessor(); + this.add(applicationStatusAppTerminatedMessageProcessor); + + applicationStatusAppTerminatingMessageProcessor = new AppStatusApplicationTerminatingMessageProcessor(); + this.add(applicationStatusAppTerminatingMessageProcessor); + + groupTerminatedMessageProcessor = new AppStatusGroupTerminatedMessageProcessor(); + this.add(groupTerminatedMessageProcessor); + + groupTerminatingMessageProcessor = new AppStatusGroupTerminatingMessageProcessor(); + this.add(groupTerminatingMessageProcessor); + + if (log.isDebugEnabled()) { + log.debug("Instance notifier message processor chain initialized"); + } + } + + public void addEventListener(EventListener eventListener) { + if(eventListener instanceof AppStatusClusterCreatedEventListener) { + clusterCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusClusterActivatedEventListener) { + clusterActivatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusClusterInactivateEventListener) { + clusterInActivateMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusGroupCreatedEventListener) { + groupCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusGroupActivatedEventListener) { + groupActivatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusClusterTerminatedEventListener){ + clusterTerminatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusClusterTerminatingEventListener){ + clusterTerminatingMessageProcessor.addEventListener(eventListener); + }else if (eventListener instanceof AppStatusGroupInactivateEventListener) { + groupInActivateMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusApplicationActivatedEventListener) { + appActivatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusApplicationInactivatedEventListener){ + applicationStatusAppInActivatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusApplicationCreatedEventListener){ + applicationStatusAppCreatedMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusApplicationTerminatingEventListener){ + applicationStatusAppTerminatingMessageProcessor.addEventListener(eventListener); + } else if(eventListener instanceof AppStatusApplicationTerminatedEventListener){ + applicationStatusAppTerminatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusGroupTerminatingEventListener){ + groupTerminatingMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AppStatusGroupTerminatedEventListener){ + groupTerminatedMessageProcessor.addEventListener(eventListener); + } else + { + throw new RuntimeException("Unknown event listener " + eventListener.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java new file mode 100644 index 0000000..71bd50e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.ApplicationActivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationActivatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationActivatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + + } + + @Override + public boolean process(String type, String message, Object object) { + if (ApplicationActivatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ApplicationActivatedEvent event = + (ApplicationActivatedEvent) Util.jsonToObject(message, ApplicationActivatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received ApplicationActivatedEvent in application status topic: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java new file mode 100644 index 0000000..db5d777 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationCreatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationCreatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + + } + + @Override + public boolean process(String type, String message, Object object) { + if (ApplicationCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ApplicationCreatedEvent event = + (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received ApplicationCreated Event in application status topic: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java new file mode 100644 index 0000000..d8f2aac --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationInactivatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationInactivatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + + } + + @Override + public boolean process(String type, String message, Object object) { + if (ApplicationInactivatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ApplicationInactivatedEvent event = + (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java new file mode 100644 index 0000000..a121ffb --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationTerminatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationTerminatedMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + + } + + @Override + public boolean process(String type, String message, Object object) { + if (ApplicationTerminatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ApplicationTerminatedEvent event = + (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java new file mode 100644 index 0000000..280de2c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationTerminatingMessageProcessor.class); + + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + + } + + @Override + public boolean process(String type, String message, Object object) { + if (ApplicationTerminatingEvent.class.getName().equals(type)) { + // Parse complete message and build event + ApplicationTerminatingEvent event = + (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java new file mode 100644 index 0000000..02ddda8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.GroupActivatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class GroupActivatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(GroupActivatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupActivatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupActivatedEvent event = + (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupActivatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java new file mode 100644 index 0000000..d04d7f9 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.GroupCreatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class GroupCreatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(GroupCreatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupCreatedEvent event = + (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received AppStatusGroupCreatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group created message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java new file mode 100644 index 0000000..6cf2587 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class GroupInactivatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(GroupInactivatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AppStatusGroupInactivateEvent.class.getName().equals(type)) { + // Parse complete message and build event + AppStatusGroupInactivateEvent event = + (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupInActivateEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java new file mode 100644 index 0000000..a917a14 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class GroupTerminatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(GroupTerminatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupTerminatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupTerminatedEvent event = + (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in GroupTerminatingEvent message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f0ba4beb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java new file mode 100644 index 0000000..63c055d --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class GroupTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(GroupTerminatingMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupTerminatingEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupTerminatingEvent event = + (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in GroupTerminatingEvent message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +}
