Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 3c2d3e722 -> 8efef751e
adding cluster in active processors to topolgy and application status topic Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8efef751 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8efef751 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8efef751 Branch: refs/heads/4.0.0-grouping Commit: 8efef751e88bebe62a9bf00c03f9db9b55ee5032 Parents: 3c2d3e7 Author: reka <[email protected]> Authored: Sat Oct 18 23:05:38 2014 +0530 Committer: reka <[email protected]> Committed: Sat Oct 18 23:05:38 2014 +0530 ---------------------------------------------------------------------- .../status/ClusterInActivateEvent.java | 50 +++++++ .../status/GroupInActivateEvent.java | 44 ++++++ .../event/topology/ClusterInActivateEvent.java | 56 ++++++++ .../event/topology/GroupInActivateEvent.java | 43 ++++++ .../status/ClusterInActivateEventListener.java | 24 ++++ .../status/GroupInActivateEventListener.java | 27 ++++ .../ClusterInActivateEventListener.java | 27 ++++ .../topology/GroupInActivateEventListener.java | 27 ++++ ...StatusClusterInActivateMessageProcessor.java | 59 ++++++++ ...onStatusGroupInActivateMessageProcessor.java | 62 +++++++++ .../topology/ClusterInActivateProcessor.java | 135 +++++++++++++++++++ .../topology/GroupInActivateProcessor.java | 109 +++++++++++++++ 12 files changed, 663 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java new file mode 100644 index 0000000..e2a5887 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/ClusterInActivateEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class ClusterInActivateEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private final String serviceName; + private final String clusterId; + private String appId; + + public ClusterInActivateEvent(String appId, String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.appId = appId; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java new file mode 100644 index 0000000..c7c8d58 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInActivateEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class GroupInActivateEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private String groupId; + private String appId; + + public GroupInActivateEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getGroupId() { + return this.groupId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java new file mode 100644 index 0000000..36ea436 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterInActivateEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Cluster activated event will be sent by Autoscaler + */ +public class ClusterInActivateEvent extends Event { + + private final String serviceName; + private final String clusterId; + private String appId; + + public ClusterInActivateEvent(String appId, String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.appId = appId; + } + + public String getServiceName() { + return serviceName; + } + + @Override + public String toString() { + return "ClusterActivatedEvent [serviceName=" + serviceName + ", clusterStatus=" + + "]"; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java new file mode 100644 index 0000000..dd7007b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInActivateEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Group Activated Event which will be sent to Topology upon group activation + */ +public class GroupInActivateEvent extends Event { + private String appId; + private String groupId; + + public GroupInActivateEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getAppId() { + return appId; + } + + public String getGroupId() { + return groupId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java new file mode 100644 index 0000000..2ae7335 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/ClusterInActivateEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class ClusterInActivateEventListener extends EventListener{ +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java new file mode 100644 index 0000000..ecc4f04 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInActivateEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by reka on 9/22/14. + */ +public abstract class GroupInActivateEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java new file mode 100644 index 0000000..03a4768 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ClusterInActivateEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by reka on 9/17/14. + */ +public abstract class ClusterInActivateEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java new file mode 100644 index 0000000..93a5a60 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupInActivateEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered by the groups activation processor after processing the event + */ +public abstract class GroupInActivateEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java new file mode 100644 index 0000000..4e2fe7c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusClusterInActivateMessageProcessor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.application.status.ClusterActivatedEvent; +import org.apache.stratos.messaging.event.application.status.ClusterInActivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ApplicationStatusClusterInActivateMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ApplicationStatusClusterInActivateMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterInActivateEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterInActivateEvent event = (ClusterInActivateEvent) Util. + jsonToObject(message, ClusterInActivateEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received ClusterInActivateEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java new file mode 100644 index 0000000..099b185 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupInActivateMessageProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.application.status.GroupActivatedEvent; +import org.apache.stratos.messaging.event.application.status.GroupInActivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationStatusGroupInActivateMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationStatusGroupInActivateMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupInActivateEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupInActivateEvent event = + (GroupInActivateEvent) Util.jsonToObject(message, GroupInActivateEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupInActivateEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in activated message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java new file mode 100644 index 0000000..8156055 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInActivateProcessor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.Status; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterInActivateEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the cluster activated event + */ +public class ClusterInActivateProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterInActivateProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + + Topology topology = (Topology) object; + + if (ClusterInActivateEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) { + return false; + } + + // Parse complete message and build event + ClusterInActivateEvent event = (ClusterInActivateEvent) Util. + jsonToObject(message, ClusterInActivateEvent.class); + + TopologyUpdater.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ClusterInActivateEvent event,Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + } + return false; + } + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), + event.getClusterId())); + } + } else { + // Apply changes to the topology + //TODO + // cluster.setStatus(Status.Activated); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster updated as activated : %s", + cluster.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8efef751/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java new file mode 100644 index 0000000..36ca259 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.domain.topology.Status; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; +import org.apache.stratos.messaging.event.topology.GroupInActivateEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupInActivateProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupActivatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupInActivateEvent event = (GroupInActivateEvent) Util. + jsonToObject(message, GroupInActivateEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupInActivateEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + //TODO + // group.setStatus(Status.Activated); + if (log.isInfoEnabled()) { + log.info(String.format("Group updated as activated : %s", + group.getUniqueIdentifier())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +}
