Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 14f5fe030 -> c98411b61


adding clusters created event and handling it in autoscaler


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c98411b6
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c98411b6
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c98411b6

Branch: refs/heads/4.0.0-grouping
Commit: c98411b6178670e89bc74971f71e8db4fd85445e
Parents: 14f5fe0
Author: reka <[email protected]>
Authored: Mon Nov 3 12:19:10 2014 +0530
Committer: reka <[email protected]>
Committed: Mon Nov 3 12:19:10 2014 +0530

----------------------------------------------------------------------
 .../applications/topic/ApplicationBuilder.java  |   2 +
 .../AutoscalerTopologyEventReceiver.java        |  20 +--
 .../ApplicationClustersCreatedEvent.java        |  45 ++++++
 ...ApplicationClustersCreatedEventListener.java |  27 ++++
 ...licationClustersCreatedMessageProcessor.java | 139 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |   6 +
 6 files changed, 229 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index bf0ad01..7e826f0 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -25,6 +25,7 @@ import 
org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import 
org.apache.stratos.autoscaler.applications.pojo.ApplicationClusterContext;
 import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
 import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
+import 
org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher;
 import org.apache.stratos.autoscaler.monitor.ApplicationMonitorFactory;
 import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
 import org.apache.stratos.messaging.domain.applications.*;
@@ -171,6 +172,7 @@ public class ApplicationBuilder {
         try {
             if (applications.getApplication(application.getUniqueIdentifier()) 
!= null) {
                 ApplicationHolder.persistApplication(application);
+                //TODO cloud controller client and register clusters
                // startApplicationMonitor(application.getUniqueIdentifier());
             } else {
                 log.warn("Application [ " + application.getUniqueIdentifier() 
+ " ] already exists in Applications");

http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 72dc6e5..d79d7f0 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -167,26 +167,26 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         });
 
 
-        /*topologyEventReceiver.addEventListener(new 
ApplicationCreatedEventListener() {
+        topologyEventReceiver.addEventListener(new 
ApplicationClustersCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    log.info("[ApplicationCreatedEvent] Received: " + 
event.getClass());
-                    ApplicationCreatedEvent applicationCreatedEvent = 
(ApplicationCreatedEvent) event;
+                    log.info("[ApplicationClustersCreatedEvent] Received: " + 
event.getClass());
+                    ApplicationClustersCreatedEvent 
applicationClustersCreatedEvent =
+                                                            
(ApplicationClustersCreatedEvent) event;
+                    String appId = applicationClustersCreatedEvent.getAppId();
                     try {
-
                         //acquire read lock
-                        TopologyManager.acquireReadLockForApplication(
-                                
applicationCreatedEvent.getApplication().getUniqueIdentifier());
+                        ApplicationHolder.acquireReadLock();
                         //start the application monitor
-                        
startApplicationMonitor(applicationCreatedEvent.getApplication().getUniqueIdentifier());
+                        startApplicationMonitor(appId);
                     } catch (Exception e) {
                         String msg = "Error processing event " + 
e.getLocalizedMessage();
                         log.error(msg, e);
                     } finally {
                         //release read lock
-                        TopologyManager.releaseReadLockForApplication(
-                                
applicationCreatedEvent.getApplication().getUniqueIdentifier());
+                        ApplicationHolder.releaseReadLock();
+
                     }
                 } catch (ClassCastException e) {
                     String msg = "Error while casting the event " + 
e.getLocalizedMessage();
@@ -194,7 +194,7 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 }
 
             }
-        });*/
+        });
 
         topologyEventReceiver.addEventListener(new 
ClusterActivatedEventListener() {
             @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java
new file mode 100644
index 0000000..368dabf
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersCreatedEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.domain.topology.Cluster;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * This will have the list of clusters which associated with an application
+ */
+public class ApplicationClustersCreatedEvent extends TopologyEvent implements 
Serializable {
+    private  List<Cluster> clusterList;
+    private String appId;
+
+    public ApplicationClustersCreatedEvent(List<Cluster> clusters, String 
appId) {
+        this.clusterList = clusters;
+        this.appId = appId;
+    }
+
+    public List<Cluster> getClusterList() {
+        return clusterList;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java
new file mode 100644
index 0000000..4dda06d
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersCreatedEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered when clusters created for an application.
+ */
+abstract public class ApplicationClustersCreatedEventListener extends 
EventListener{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
new file mode 100644
index 0000000..3780db5
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import 
org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import 
org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.List;
+
+/**
+ * This will process the clusters and add them to relevant service.
+ */
+public class ApplicationClustersCreatedMessageProcessor extends 
MessageProcessor {
+    private static final Log log = 
LogFactory.getLog(ApplicationClustersCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+        if (ApplicationClustersCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            ApplicationClustersCreatedEvent event = 
(ApplicationClustersCreatedEvent) Util.
+                                                jsonToObject(message, 
ApplicationClustersCreatedEvent.class);
+                return doProcess(event, topology);
+
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationClustersCreatedEvent event,Topology 
topology) {
+        List<Cluster> clusters = event.getClusterList();
+
+        for(Cluster cluster : clusters) {
+            String serviceName = cluster.getServiceName();
+            String clusterId = cluster.getClusterId();
+            TopologyUpdater.acquireWriteLockForCluster(clusterId, serviceName);
+            try {
+
+                // Apply service filter
+                if (TopologyServiceFilter.getInstance().isActive()) {
+                    if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(serviceName)) {
+                        // Service is excluded, do not update topology or fire 
event
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Service is excluded: 
[service] %s", serviceName));
+                        }
+                        return false;
+                    }
+                }
+
+                // Apply cluster filter
+                if (TopologyClusterFilter.getInstance().isActive()) {
+                    if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(clusterId)) {
+                        // Cluster is excluded, do not update topology or fire 
event
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Cluster is excluded: 
[cluster] %s", clusterId));
+                        }
+                        return false;
+                    }
+                }
+
+                // Validate event against the existing topology
+                Service service = topology.getService(serviceName);
+                if (service == null) {
+                    if (log.isWarnEnabled()) {
+                        log.warn(String.format("Service does not exist: 
[service] %s",
+                                serviceName));
+                    }
+                    return false;
+                }
+                if (service.clusterExists(clusterId)) {
+                    if (log.isWarnEnabled()) {
+                        log.warn(String.format("Cluster already exists in 
service: [service] %s " +
+                                        "[cluster] %s",serviceName ,
+                                clusterId));
+                    }
+                } else {
+
+                    // Apply changes to the topology
+                    service.addCluster(cluster);
+                    cluster.setStatus(ClusterStatus.Created);
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Cluster created: %s",
+                                cluster.toString()));
+                    }
+                }
+            } finally {
+                TopologyUpdater.releaseWriteLockForCluster(clusterId, 
serviceName);
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/c98411b6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index f5ada89..1a84c8c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -35,6 +35,7 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
     private CompleteTopologyMessageProcessor completeTopologyMessageProcessor;
     private ServiceCreatedMessageProcessor serviceCreatedMessageProcessor;
     private ServiceRemovedMessageProcessor serviceRemovedMessageProcessor;
+    private ApplicationClustersCreatedMessageProcessor 
clustersCreatedMessageProcessor;
     private ClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
     private ClusterResetMessageProcessor clusterResetMessageProcessor;
     private ClusterActivatedProcessor clusterActivatedProcessor;
@@ -61,6 +62,9 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
         serviceRemovedMessageProcessor = new ServiceRemovedMessageProcessor();
         add(serviceRemovedMessageProcessor);
 
+        clustersCreatedMessageProcessor = new 
ApplicationClustersCreatedMessageProcessor();
+        add(clustersCreatedMessageProcessor);
+
         clusterCreatedMessageProcessor = new ClusterCreatedMessageProcessor();
         add(clusterCreatedMessageProcessor);
 
@@ -113,6 +117,8 @@ public class TopologyMessageProcessorChain extends 
MessageProcessorChain {
             completeTopologyMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterCreatedEventListener) {
             clusterCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof 
ApplicationClustersCreatedEventListener) {
+            clustersCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterActivatedEventListener) {
             clusterActivatedProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterInActivateEventListener) {

Reply via email to