Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 14f5fe030 -> c98411b61
adding clusters created event and handling it in autoscaler Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c98411b6 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c98411b6 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c98411b6 Branch: refs/heads/4.0.0-grouping Commit: c98411b6178670e89bc74971f71e8db4fd85445e Parents: 14f5fe0 Author: reka <[email protected]> Authored: Mon Nov 3 12:19:10 2014 +0530 Committer: reka <[email protected]> Committed: Mon Nov 3 12:19:10 2014 +0530 ---------------------------------------------------------------------- .../applications/topic/ApplicationBuilder.java | 2 + .../AutoscalerTopologyEventReceiver.java | 20 +-- .../ApplicationClustersCreatedEvent.java | 45 ++++++ ...ApplicationClustersCreatedEventListener.java | 27 ++++ ...licationClustersCreatedMessageProcessor.java | 139 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 6 + 6 files changed, 229 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java index bf0ad01..7e826f0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java @@ -25,6 +25,7 @@ import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.applications.pojo.ApplicationClusterContext; import org.apache.stratos.autoscaler.exception.DependencyBuilderException; import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; +import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.monitor.ApplicationMonitorFactory; import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; import org.apache.stratos.messaging.domain.applications.*; @@ -171,6 +172,7 @@ public class ApplicationBuilder { try { if (applications.getApplication(application.getUniqueIdentifier()) != null) { ApplicationHolder.persistApplication(application); + //TODO cloud controller client and register clusters // startApplicationMonitor(application.getUniqueIdentifier()); } else { log.warn("Application [ " + application.getUniqueIdentifier() + " ] already exists in Applications"); http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 72dc6e5..d79d7f0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -167,26 +167,26 @@ public class AutoscalerTopologyEventReceiver implements Runnable { }); - /*topologyEventReceiver.addEventListener(new ApplicationCreatedEventListener() { + topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { @Override protected void onEvent(Event event) { try { - log.info("[ApplicationCreatedEvent] Received: " + event.getClass()); - ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event; + log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass()); + ApplicationClustersCreatedEvent applicationClustersCreatedEvent = + (ApplicationClustersCreatedEvent) event; + String appId = applicationClustersCreatedEvent.getAppId(); try { - //acquire read lock - TopologyManager.acquireReadLockForApplication( - applicationCreatedEvent.getApplication().getUniqueIdentifier()); + ApplicationHolder.acquireReadLock(); //start the application monitor - startApplicationMonitor(applicationCreatedEvent.getApplication().getUniqueIdentifier()); + startApplicationMonitor(appId); } catch (Exception e) { String msg = "Error processing event " + e.getLocalizedMessage(); log.error(msg, e); } finally { //release read lock - TopologyManager.releaseReadLockForApplication( - applicationCreatedEvent.getApplication().getUniqueIdentifier()); + ApplicationHolder.releaseReadLock(); + } } catch (ClassCastException e) { String msg = "Error while casting the event " + e.getLocalizedMessage(); @@ -194,7 +194,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } } - });*/ + }); topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() { @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java new file mode 100644 index 0000000..368dabf --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.domain.topology.Cluster; + +import java.io.Serializable; +import java.util.List; + +/** + * This will have the list of clusters which associated with an application + */ +public class ApplicationClustersCreatedEvent extends TopologyEvent implements Serializable { + private List<Cluster> clusterList; + private String appId; + + public ApplicationClustersCreatedEvent(List<Cluster> clusters, String appId) { + this.clusterList = clusters; + this.appId = appId; + } + + public List<Cluster> getClusterList() { + return clusterList; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java new file mode 100644 index 0000000..4dda06d --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered when clusters created for an application. + */ +abstract public class ApplicationClustersCreatedEventListener extends EventListener{ +} http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java new file mode 100644 index 0000000..3780db5 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.Util; + +import java.util.List; + +/** + * This will process the clusters and add them to relevant service. + */ +public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ApplicationClustersCreatedMessageProcessor.class); + private MessageProcessor nextProcessor; + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + if (ApplicationClustersCreatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) { + return false; + } + + // Parse complete message and build event + ApplicationClustersCreatedEvent event = (ApplicationClustersCreatedEvent) Util. + jsonToObject(message, ApplicationClustersCreatedEvent.class); + return doProcess(event, topology); + + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ApplicationClustersCreatedEvent event,Topology topology) { + List<Cluster> clusters = event.getClusterList(); + + for(Cluster cluster : clusters) { + String serviceName = cluster.getServiceName(); + String clusterId = cluster.getClusterId(); + TopologyUpdater.acquireWriteLockForCluster(clusterId, serviceName); + try { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(serviceName)) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", serviceName)); + } + return false; + } + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(clusterId)) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", clusterId)); + } + return false; + } + } + + // Validate event against the existing topology + Service service = topology.getService(serviceName); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + serviceName)); + } + return false; + } + if (service.clusterExists(clusterId)) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster already exists in service: [service] %s " + + "[cluster] %s",serviceName , + clusterId)); + } + } else { + + // Apply changes to the topology + service.addCluster(cluster); + cluster.setStatus(ClusterStatus.Created); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster created: %s", + cluster.toString())); + } + } + } finally { + TopologyUpdater.releaseWriteLockForCluster(clusterId, serviceName); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index f5ada89..1a84c8c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -35,6 +35,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private CompleteTopologyMessageProcessor completeTopologyMessageProcessor; private ServiceCreatedMessageProcessor serviceCreatedMessageProcessor; private ServiceRemovedMessageProcessor serviceRemovedMessageProcessor; + private ApplicationClustersCreatedMessageProcessor clustersCreatedMessageProcessor; private ClusterCreatedMessageProcessor clusterCreatedMessageProcessor; private ClusterResetMessageProcessor clusterResetMessageProcessor; private ClusterActivatedProcessor clusterActivatedProcessor; @@ -61,6 +62,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { serviceRemovedMessageProcessor = new ServiceRemovedMessageProcessor(); add(serviceRemovedMessageProcessor); + clustersCreatedMessageProcessor = new ApplicationClustersCreatedMessageProcessor(); + add(clustersCreatedMessageProcessor); + clusterCreatedMessageProcessor = new ClusterCreatedMessageProcessor(); add(clusterCreatedMessageProcessor); @@ -113,6 +117,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { completeTopologyMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ClusterCreatedEventListener) { clusterCreatedMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof ApplicationClustersCreatedEventListener) { + clustersCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ClusterActivatedEventListener) { clusterActivatedProcessor.addEventListener(eventListener); } else if (eventListener instanceof ClusterInActivateEventListener) {
