Updated Branches:
  refs/heads/master a74b60ec4 -> c852a28ac

Introduced new functionality to filter topology events in topology processors 
to support dedicated load balancing for services


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c852a28a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c852a28a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c852a28a

Branch: refs/heads/master
Commit: c852a28ac6a7de70afde43db2b794a325ed4ab75
Parents: a74b60e
Author: Imesh Gunaratne <[email protected]>
Authored: Wed Nov 20 22:29:11 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Wed Nov 20 22:29:11 2013 +0530

----------------------------------------------------------------------
 .../internal/LoadBalancerServiceComponent.java  | 16 +++-
 .../message/filter/topology/ServiceFilter.java  | 83 ++++++++++++++++++++
 .../topology/ClusterCreatedEventProcessor.java  | 12 +++
 .../topology/ClusterRemovedEventProcessor.java  | 13 +++
 .../CompleteTopologyEventProcessor.java         | 33 +++++++-
 .../topology/InstanceSpawnedEventProcessor.java | 13 ++-
 .../topology/MemberActivatedEventProcessor.java | 12 +++
 .../topology/MemberStartedEventProcessor.java   | 12 +++
 .../topology/MemberSuspendedEventProcessor.java | 12 +++
 .../MemberTerminatedEventProcessor.java         | 12 +++
 .../topology/ServiceCreatedEventProcessor.java  | 12 +++
 .../topology/ServiceRemovedEventProcessor.java  | 12 +++
 components/pom.xml                              | 36 ++++-----
 .../pom.xml                                     |  1 +
 pom.xml                                         |  4 +-
 .../distribution/src/main/bin/stratos.bat       |  2 +-
 .../distribution/src/main/bin/stratos.sh        |  1 +
 17 files changed, 262 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 304dd55..4d8a28e 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
 import 
org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
 import org.apache.synapse.core.SynapseEnvironment;
@@ -103,10 +104,23 @@ public class LoadBalancerServiceComponent {
             topologyReceiver = new LoadBalancerTopologyReceiver();
             Thread topologyReceiverThread = new Thread(topologyReceiver);
             topologyReceiverThread.start();
-            if(log.isInfoEnabled()) {
+            if (log.isInfoEnabled()) {
                 log.info("Topology receiver thread started");
             }
 
+            if (log.isInfoEnabled()) {
+                if (ServiceFilter.getInstance().isActive()) {
+                    StringBuilder sb = new StringBuilder();
+                    for (String serviceName : 
ServiceFilter.getInstance().getIncludedServiceNames()) {
+                        if (sb.length() > 0) {
+                            sb.append(", ");
+                        }
+                        sb.append(serviceName);
+                    }
+                    log.info(String.format("Service filter activated: 
[services] %s", sb.toString()));
+                }
+            }
+
             activated = true;
             if (log.isDebugEnabled()) {
                 log.debug("LoadBalancerServiceComponent is activated ");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
new file mode 100644
index 0000000..2c78a25
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A filter to discard topology events which are not in a given service name 
list.
+ */
+public class ServiceFilter {
+    private static final Log log = LogFactory.getLog(ServiceFilter.class);
+    private static volatile ServiceFilter instance;
+
+    private Map<String, Boolean> serviceNameMap;
+
+    private ServiceFilter() {
+        this.serviceNameMap = new HashMap<String, Boolean>();
+
+        String filter = 
System.getProperty("stratos.messaging.topology.service.filter");
+        if(StringUtils.isNotBlank(filter)) {
+            String[] array = filter.split(",");
+            for(String item : array) {
+                serviceNameMap.put(item, true);
+            }
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Service filter initialized: 
[included] %s", filter));
+            }
+        }
+    }
+
+    public static synchronized ServiceFilter getInstance() {
+        if (instance == null) {
+            synchronized (ServiceFilter.class){
+                if (instance == null) {
+                    instance = new ServiceFilter();
+                    if(log.isDebugEnabled()) {
+                        log.debug("Service filter object created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    public boolean isActive() {
+        return serviceNameMap.size() > 0;
+    }
+
+    public boolean included(String serviceName) {
+        return serviceNameMap.containsKey(serviceName);
+    }
+
+    public boolean excluded(String serviceName) {
+        return !serviceNameMap.containsKey(serviceName);
+    }
+
+    public Collection<String> getIncludedServiceNames() {
+        return serviceNameMap.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 30e3e7e..0f9d9d3 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterCreatedEventProcessor extends MessageProcessor {
@@ -46,6 +47,17 @@ public class ClusterCreatedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) 
Util.jsonToObject(message, ClusterCreatedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event properties
             if(StringUtils.isBlank(event.getHostName())) {
                 throw new RuntimeException("Hostname not found in cluster 
created event");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index 6cceb0b..e60172b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterRemovedEventProcessor extends MessageProcessor {
@@ -44,6 +45,18 @@ public class ClusterRemovedEventProcessor extends 
MessageProcessor {
         if (ClusterRemovedEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) 
Util.jsonToObject(message, ClusterRemovedEvent.class);
+
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event properties
             if(StringUtils.isBlank(event.getHostName())) {
                 throw new RuntimeException("Hostname not found in cluster 
removed event");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index 488a3eb..bcd52e5 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -20,9 +20,11 @@ package 
org.apache.stratos.messaging.message.processor.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class CompleteTopologyEventProcessor extends MessageProcessor {
@@ -42,7 +44,36 @@ public class CompleteTopologyEventProcessor extends 
MessageProcessor {
         if (CompleteTopologyEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             CompleteTopologyEvent event = (CompleteTopologyEvent) 
Util.jsonToObject(message, CompleteTopologyEvent.class);
-            topology.addServices(event.getTopology().getServices());
+
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                // Add services included in service filter
+                for(Service service : event.getTopology().getServices()) {
+                    
if(ServiceFilter.getInstance().included(service.getServiceName())) {
+                        topology.addService(service);
+                    }
+                }
+            }
+            else {
+                // Add all services
+                topology.addServices(event.getTopology().getServices());
+            }
+            if(log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                for(Service service : topology.getServices()) {
+                    if(sb.length() > 0) {
+                        sb.append(", ");
+                    }
+                    sb.append(service.getServiceName());
+                }
+                if(sb.length() > 0) {
+                    log.debug(String.format("Services added: %s", 
sb.toString()));
+                }
+            }
+
+            // Add partitions
+            topology.addPartitions(event.getTopology().getPartitions());
+
             if (log.isInfoEnabled()) {
                 log.info("Topology initialized");
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index 36ad335..518232d 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -22,8 +22,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class InstanceSpawnedEventProcessor extends MessageProcessor {
@@ -44,6 +44,17 @@ public class InstanceSpawnedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             InstanceSpawnedEvent event = (InstanceSpawnedEvent) 
Util.jsonToObject(message, InstanceSpawnedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 563b2ab..81c00cf 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberActivatedEventProcessor extends MessageProcessor {
@@ -47,6 +48,17 @@ public class MemberActivatedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) 
Util.jsonToObject(message, MemberActivatedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index 9745c32..e0a3880 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberStartedEventProcessor extends MessageProcessor {
@@ -47,6 +48,17 @@ public class MemberStartedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) 
Util.jsonToObject(message, MemberStartedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index a4a5ef6..1cbb3b1 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberSuspendedEventProcessor extends MessageProcessor {
@@ -47,6 +48,17 @@ public class MemberSuspendedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberSuspendedEvent event = (MemberSuspendedEvent) 
Util.jsonToObject(message, MemberSuspendedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index 87862fc..4c9e891 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberTerminatedEventProcessor extends MessageProcessor {
@@ -47,6 +48,17 @@ public class MemberTerminatedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberTerminatedEvent event = (MemberTerminatedEvent) 
Util.jsonToObject(message, MemberTerminatedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
index 8a63851..f2c8059 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceCreatedEventProcessor extends MessageProcessor {
@@ -44,6 +45,17 @@ public class ServiceCreatedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             ServiceCreatedEvent event = (ServiceCreatedEvent) 
Util.jsonToObject(message, ServiceCreatedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             if (topology.serviceExists(event.getServiceName())) {
                 throw new RuntimeException(String.format("Service already 
created: [service] %s", event.getServiceName()));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
index 289d318..3859f60 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceRemovedEventProcessor extends MessageProcessor {
@@ -44,6 +45,17 @@ public class ServiceRemovedEventProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             ServiceRemovedEvent event = (ServiceRemovedEvent) 
Util.jsonToObject(message, ServiceRemovedEvent.class);
 
+            // Apply service filter
+            if(ServiceFilter.getInstance().isActive()) {
+                
if(ServiceFilter.getInstance().excluded(event.getServiceName())) {
+                    // Service is excluded, do not update topology or fire 
event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index b97402e..afde350 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -253,28 +253,28 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <wso2carbon.version>4.1.0</wso2carbon.version>
-       <junit.version>4.11</junit.version>
-       <synapse.core.version>2.1.1-wso2v4</synapse.core.version>
-       <google.guava.wso2.version>12.0.0.wso2v1</google.guava.wso2.version>
-       <carbon.platform.version>4.1.0</carbon.platform.version>
-       
<version.equinox.osgi.services>3.3.100.v20120522-1822</version.equinox.osgi.services>
+        <junit.version>4.11</junit.version>
+        <synapse.core.version>2.1.1-wso2v4</synapse.core.version>
+        <google.guava.wso2.version>12.0.0.wso2v1</google.guava.wso2.version>
+        <carbon.platform.version>4.1.0</carbon.platform.version>
+        
<version.equinox.osgi.services>3.3.100.v20120522-1822</version.equinox.osgi.services>
         <version.equinox.osgi>3.8.1.v20120830-144521</version.equinox.osgi>
-       <axiom.osgi.version.range>[1.2.11.wso2v4, 
1.3.0)</axiom.osgi.version.range>
-        <version.tomcat>7.0.34</version.tomcat>        
-       <axis2.wso2.version>1.6.1.wso2v9</axis2.wso2.version>
-       <axis2.kernel.version>1.6.1-wso2v9</axis2.kernel.version>
+        <axiom.osgi.version.range>[1.2.11.wso2v4, 
1.3.0)</axiom.osgi.version.range>
+        <version.tomcat>7.0.34</version.tomcat>
+        <axis2.wso2.version>1.6.1.wso2v9</axis2.wso2.version>
+        <axis2.kernel.version>1.6.1-wso2v9</axis2.kernel.version>
         <version.log4j>1.2.17</version.log4j>
         <version.commons.logging>1.1.1</version.commons.logging>
-       
<orbit.version.geronimo-jms_1.1_spec>1.1.0.wso2v1</orbit.version.geronimo-jms_1.1_spec>
+        
<orbit.version.geronimo-jms_1.1_spec>1.1.0.wso2v1</orbit.version.geronimo-jms_1.1_spec>
         <google.guice.wso2.version>3.0.wso2v1</google.guice.wso2.version>
-       <commons-httpclient.version>3.0.1</commons-httpclient.version>
-       <json.wso2.version>2.0.0.wso2v1</json.wso2.version>
-       <axiom.wso2.version>1.2.11.wso2v4</axiom.wso2.version>
-       <orbit.version.axis2>1.6.1.wso2v5</orbit.version.axis2>
-       <orbit.version.axiom>1.2.11.wso2v3</orbit.version.axiom>
-       <neethi.wso2.version>2.0.4.wso2v4</neethi.wso2.version>
-       <imp.pkg.version.javax.servlet>[2.6.0, 
3.0.0)</imp.pkg.version.javax.servlet>
-       <rampart.wso2.version>1.6.1.wso2v9</rampart.wso2.version>
+        <commons-httpclient.version>3.0.1</commons-httpclient.version>
+        <json.wso2.version>2.0.0.wso2v1</json.wso2.version>
+        <axiom.wso2.version>1.2.11.wso2v4</axiom.wso2.version>
+        <orbit.version.axis2>1.6.1.wso2v5</orbit.version.axis2>
+        <orbit.version.axiom>1.2.11.wso2v3</orbit.version.axiom>
+        <neethi.wso2.version>2.0.4.wso2v4</neethi.wso2.version>
+        <imp.pkg.version.javax.servlet>[2.6.0, 
3.0.0)</imp.pkg.version.javax.servlet>
+        <rampart.wso2.version>1.6.1.wso2v9</rampart.wso2.version>
         <!--eclipse.osgi.version>3.5.0.v20090520</eclipse.osgi.version>
         
<eclipse.osgi.services.version>3.2.0.v20090520-1800</eclipse.osgi.services.version>
         
<equinox.commons.logging.version>1.0.4.v200706111724</equinox.commons.logging.version>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/features/messaging/org.apache.stratos.messaging.feature/pom.xml
----------------------------------------------------------------------
diff --git a/features/messaging/org.apache.stratos.messaging.feature/pom.xml 
b/features/messaging/org.apache.stratos.messaging.feature/pom.xml
index 363654e..e154f16 100644
--- a/features/messaging/org.apache.stratos.messaging.feature/pom.xml
+++ b/features/messaging/org.apache.stratos.messaging.feature/pom.xml
@@ -69,6 +69,7 @@
                             <bundles>
                                 
<bundleDef>org.apache.stratos:org.apache.stratos.messaging:${project.version}</bundleDef>
                                 
<bundleDef>com.google.code.gson:gson:${gson2.version}</bundleDef>
+                                
<bundleDef>org.apache.commons:commons-lang3:3.1</bundleDef>
                                 
<bundleDef>org.slf4j:slf4j-api:1.6.4</bundleDef>
                                 
<bundleDef>org.slf4j:slf4j-log4j12:1.6.4</bundleDef>
                             </bundles>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 13b1e6a..d1c6607 100644
--- a/pom.xml
+++ b/pom.xml
@@ -431,7 +431,7 @@
         <wso2carbon.version>4.1.0</wso2carbon.version>
         <carbon.platform.package.import.version.range>[4.1.0, 
4.2.0)</carbon.platform.package.import.version.range>
         
<carbon.platform.package.export.version>4.1.0</carbon.platform.package.export.version>
-       <axis2.osgi.version>1.6.1.wso2v9</axis2.osgi.version>
-       <jclouds.version>1.6.2-incubating</jclouds.version>
+        <axis2.osgi.version>1.6.1.wso2v9</axis2.osgi.version>
+        <jclouds.version>1.6.2-incubating</jclouds.version>
     </properties>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
----------------------------------------------------------------------
diff --git 
a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat 
b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
index aa4ae5f..5fe0d9a 100644
--- a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
+++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
@@ -154,7 +154,7 @@ set CARBON_CLASSPATH=.\lib;%CARBON_CLASSPATH%
 
 set 
JAVA_ENDORSED=".\lib\endorsed";"%JAVA_HOME%\jre\lib\endorsed";"%JAVA_HOME%\lib\endorsed"
 
-set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m 
-Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC 
-XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled 
-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof 
-Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% 
-Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ 
-Dcarbon.home="%CARBON_HOME%" 
-Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" 
-Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" 
-Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" 
-Dcatalina.base="%CARBON_HOME%\lib\tomcat" 
-Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml 
-Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" 
-Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" 
-Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties"
 -
 Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" 
-Djndi.properties.dir="%CARBON_HOME%/repository/conf" 
-Dconf.location="%CARBON_HOME%\repository\conf" 
-Dcarbon.logs.path="%CARBON_HOME%\repository\logs" 
-Dcomponents.repo="%CARBON_HOME%\repository\components" 
-Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" 
-Dcom.atomikos.icatch.hide_init_file_path="true" 
-Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true 
-Dcom.sun.jndi.ldap.connect.pool.authentication=simple 
-Dcom.sun.jndi.ldap.connect.pool.timeout=3000 
-Dorg.terracotta.quartz.skipUpdateCheck=true 
-Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 
-Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks
 -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost 
-Dthrift.receiver.port=7615
+set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m 
-Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC 
-XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled 
-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof 
-Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% 
-Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ 
-Dcarbon.home="%CARBON_HOME%" 
-Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" 
-Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" 
-Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" 
-Dcatalina.base="%CARBON_HOME%\lib\tomcat" 
-Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml 
-Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" 
-Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" 
-Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties"
 -
 Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" 
-Djndi.properties.dir="%CARBON_HOME%/repository/conf" 
-Dconf.location="%CARBON_HOME%\repository\conf" 
-Dcarbon.logs.path="%CARBON_HOME%\repository\logs" 
-Dcomponents.repo="%CARBON_HOME%\repository\components" 
-Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" 
-Dcom.atomikos.icatch.hide_init_file_path="true" 
-Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true 
-Dcom.sun.jndi.ldap.connect.pool.authentication=simple 
-Dcom.sun.jndi.ldap.connect.pool.timeout=3000 
-Dorg.terracotta.quartz.skipUpdateCheck=true 
-Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 
-Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks
 -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost 
-Dthrift.receiver.port=7615 -Dstratos.messaging.topology.service.filter=""
 
 :runJava
 echo JAVA_HOME environment variable is set to %JAVA_HOME%

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
----------------------------------------------------------------------
diff --git 
a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh 
b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
index 85dbd51..1e17dcc 100644
--- a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
+++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
@@ -298,6 +298,7 @@ exec "$JAVACMD" \
         -Djavax.net.ssl.trustStorePassword=wso2carbon \
         -Dthrift.receiver.ip=localhost \
         -Dthrift.receiver.port=7615 \
+        -Dstratos.messaging.topology.service.filter="" \
         org.wso2.carbon.bootstrap.Bootstrap $*
 
 

Reply via email to