Updated Branches: refs/heads/master a74b60ec4 -> c852a28ac
Introduced new functionality to filter topology events in topology processors to support dedicated load balancing for services Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c852a28a Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c852a28a Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c852a28a Branch: refs/heads/master Commit: c852a28ac6a7de70afde43db2b794a325ed4ab75 Parents: a74b60e Author: Imesh Gunaratne <[email protected]> Authored: Wed Nov 20 22:29:11 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Nov 20 22:29:11 2013 +0530 ---------------------------------------------------------------------- .../internal/LoadBalancerServiceComponent.java | 16 +++- .../message/filter/topology/ServiceFilter.java | 83 ++++++++++++++++++++ .../topology/ClusterCreatedEventProcessor.java | 12 +++ .../topology/ClusterRemovedEventProcessor.java | 13 +++ .../CompleteTopologyEventProcessor.java | 33 +++++++- .../topology/InstanceSpawnedEventProcessor.java | 13 ++- .../topology/MemberActivatedEventProcessor.java | 12 +++ .../topology/MemberStartedEventProcessor.java | 12 +++ .../topology/MemberSuspendedEventProcessor.java | 12 +++ .../MemberTerminatedEventProcessor.java | 12 +++ .../topology/ServiceCreatedEventProcessor.java | 12 +++ .../topology/ServiceRemovedEventProcessor.java | 12 +++ components/pom.xml | 36 ++++----- .../pom.xml | 1 + pom.xml | 4 +- .../distribution/src/main/bin/stratos.bat | 2 +- .../distribution/src/main/bin/stratos.sh | 1 + 17 files changed, 262 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 304dd55..4d8a28e 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -26,6 +26,7 @@ import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver; import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder; import org.apache.synapse.core.SynapseEnvironment; @@ -103,10 +104,23 @@ public class LoadBalancerServiceComponent { topologyReceiver = new LoadBalancerTopologyReceiver(); Thread topologyReceiverThread = new Thread(topologyReceiver); topologyReceiverThread.start(); - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info("Topology receiver thread started"); } + if (log.isInfoEnabled()) { + if (ServiceFilter.getInstance().isActive()) { + StringBuilder sb = new StringBuilder(); + for (String serviceName : ServiceFilter.getInstance().getIncludedServiceNames()) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(serviceName); + } + log.info(String.format("Service filter activated: [services] %s", sb.toString())); + } + } + activated = true; if (log.isDebugEnabled()) { log.debug("LoadBalancerServiceComponent is activated "); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java new file mode 100644 index 0000000..2c78a25 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.filter.topology; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * A filter to discard topology events which are not in a given service name list. + */ +public class ServiceFilter { + private static final Log log = LogFactory.getLog(ServiceFilter.class); + private static volatile ServiceFilter instance; + + private Map<String, Boolean> serviceNameMap; + + private ServiceFilter() { + this.serviceNameMap = new HashMap<String, Boolean>(); + + String filter = System.getProperty("stratos.messaging.topology.service.filter"); + if(StringUtils.isNotBlank(filter)) { + String[] array = filter.split(","); + for(String item : array) { + serviceNameMap.put(item, true); + } + if(log.isDebugEnabled()) { + log.debug(String.format("Service filter initialized: [included] %s", filter)); + } + } + } + + public static synchronized ServiceFilter getInstance() { + if (instance == null) { + synchronized (ServiceFilter.class){ + if (instance == null) { + instance = new ServiceFilter(); + if(log.isDebugEnabled()) { + log.debug("Service filter object created"); + } + } + } + } + return instance; + } + + public boolean isActive() { + return serviceNameMap.size() > 0; + } + + public boolean included(String serviceName) { + return serviceNameMap.containsKey(serviceName); + } + + public boolean excluded(String serviceName) { + return !serviceNameMap.containsKey(serviceName); + } + + public Collection<String> getIncludedServiceNames() { + return serviceNameMap.keySet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java index 30e3e7e..0f9d9d3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class ClusterCreatedEventProcessor extends MessageProcessor { @@ -46,6 +47,17 @@ public class ClusterCreatedEventProcessor extends MessageProcessor { // Parse complete message and build event ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event properties if(StringUtils.isBlank(event.getHostName())) { throw new RuntimeException("Hostname not found in cluster created event"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java index 6cceb0b..e60172b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class ClusterRemovedEventProcessor extends MessageProcessor { @@ -44,6 +45,18 @@ public class ClusterRemovedEventProcessor extends MessageProcessor { if (ClusterRemovedEvent.class.getName().equals(type)) { // Parse complete message and build event ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class); + + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event properties if(StringUtils.isBlank(event.getHostName())) { throw new RuntimeException("Hostname not found in cluster removed event"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java index 488a3eb..bcd52e5 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java @@ -20,9 +20,11 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class CompleteTopologyEventProcessor extends MessageProcessor { @@ -42,7 +44,36 @@ public class CompleteTopologyEventProcessor extends MessageProcessor { if (CompleteTopologyEvent.class.getName().equals(type)) { // Parse complete message and build event CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class); - topology.addServices(event.getTopology().getServices()); + + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + // Add services included in service filter + for(Service service : event.getTopology().getServices()) { + if(ServiceFilter.getInstance().included(service.getServiceName())) { + topology.addService(service); + } + } + } + else { + // Add all services + topology.addServices(event.getTopology().getServices()); + } + if(log.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + for(Service service : topology.getServices()) { + if(sb.length() > 0) { + sb.append(", "); + } + sb.append(service.getServiceName()); + } + if(sb.length() > 0) { + log.debug(String.format("Services added: %s", sb.toString())); + } + } + + // Add partitions + topology.addPartitions(event.getTopology().getPartitions()); + if (log.isInfoEnabled()) { log.info("Topology initialized"); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java index 36ad335..518232d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java @@ -22,8 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent; -import org.apache.stratos.messaging.event.topology.MemberStartedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class InstanceSpawnedEventProcessor extends MessageProcessor { @@ -44,6 +44,17 @@ public class InstanceSpawnedEventProcessor extends MessageProcessor { // Parse complete message and build event InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java index 563b2ab..81c00cf 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class MemberActivatedEventProcessor extends MessageProcessor { @@ -47,6 +48,17 @@ public class MemberActivatedEventProcessor extends MessageProcessor { // Parse complete message and build event MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java index 9745c32..e0a3880 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberStartedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class MemberStartedEventProcessor extends MessageProcessor { @@ -47,6 +48,17 @@ public class MemberStartedEventProcessor extends MessageProcessor { // Parse complete message and build event MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java index a4a5ef6..1cbb3b1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class MemberSuspendedEventProcessor extends MessageProcessor { @@ -47,6 +48,17 @@ public class MemberSuspendedEventProcessor extends MessageProcessor { // Parse complete message and build event MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java index 87862fc..4c9e891 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class MemberTerminatedEventProcessor extends MessageProcessor { @@ -47,6 +48,17 @@ public class MemberTerminatedEventProcessor extends MessageProcessor { // Parse complete message and build event MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java index 8a63851..f2c8059 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedEventProcessor.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class ServiceCreatedEventProcessor extends MessageProcessor { @@ -44,6 +45,17 @@ public class ServiceCreatedEventProcessor extends MessageProcessor { // Parse complete message and build event ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology if (topology.serviceExists(event.getServiceName())) { throw new RuntimeException(String.format("Service already created: [service] %s", event.getServiceName())); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java index 289d318..3859f60 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedEventProcessor.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.filter.topology.ServiceFilter; import org.apache.stratos.messaging.util.Util; public class ServiceRemovedEventProcessor extends MessageProcessor { @@ -44,6 +45,17 @@ public class ServiceRemovedEventProcessor extends MessageProcessor { // Parse complete message and build event ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class); + // Apply service filter + if(ServiceFilter.getInstance().isActive()) { + if(ServiceFilter.getInstance().excluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if(log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return true; + } + } + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index b97402e..afde350 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -253,28 +253,28 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <wso2carbon.version>4.1.0</wso2carbon.version> - <junit.version>4.11</junit.version> - <synapse.core.version>2.1.1-wso2v4</synapse.core.version> - <google.guava.wso2.version>12.0.0.wso2v1</google.guava.wso2.version> - <carbon.platform.version>4.1.0</carbon.platform.version> - <version.equinox.osgi.services>3.3.100.v20120522-1822</version.equinox.osgi.services> + <junit.version>4.11</junit.version> + <synapse.core.version>2.1.1-wso2v4</synapse.core.version> + <google.guava.wso2.version>12.0.0.wso2v1</google.guava.wso2.version> + <carbon.platform.version>4.1.0</carbon.platform.version> + <version.equinox.osgi.services>3.3.100.v20120522-1822</version.equinox.osgi.services> <version.equinox.osgi>3.8.1.v20120830-144521</version.equinox.osgi> - <axiom.osgi.version.range>[1.2.11.wso2v4, 1.3.0)</axiom.osgi.version.range> - <version.tomcat>7.0.34</version.tomcat> - <axis2.wso2.version>1.6.1.wso2v9</axis2.wso2.version> - <axis2.kernel.version>1.6.1-wso2v9</axis2.kernel.version> + <axiom.osgi.version.range>[1.2.11.wso2v4, 1.3.0)</axiom.osgi.version.range> + <version.tomcat>7.0.34</version.tomcat> + <axis2.wso2.version>1.6.1.wso2v9</axis2.wso2.version> + <axis2.kernel.version>1.6.1-wso2v9</axis2.kernel.version> <version.log4j>1.2.17</version.log4j> <version.commons.logging>1.1.1</version.commons.logging> - <orbit.version.geronimo-jms_1.1_spec>1.1.0.wso2v1</orbit.version.geronimo-jms_1.1_spec> + <orbit.version.geronimo-jms_1.1_spec>1.1.0.wso2v1</orbit.version.geronimo-jms_1.1_spec> <google.guice.wso2.version>3.0.wso2v1</google.guice.wso2.version> - <commons-httpclient.version>3.0.1</commons-httpclient.version> - <json.wso2.version>2.0.0.wso2v1</json.wso2.version> - <axiom.wso2.version>1.2.11.wso2v4</axiom.wso2.version> - <orbit.version.axis2>1.6.1.wso2v5</orbit.version.axis2> - <orbit.version.axiom>1.2.11.wso2v3</orbit.version.axiom> - <neethi.wso2.version>2.0.4.wso2v4</neethi.wso2.version> - <imp.pkg.version.javax.servlet>[2.6.0, 3.0.0)</imp.pkg.version.javax.servlet> - <rampart.wso2.version>1.6.1.wso2v9</rampart.wso2.version> + <commons-httpclient.version>3.0.1</commons-httpclient.version> + <json.wso2.version>2.0.0.wso2v1</json.wso2.version> + <axiom.wso2.version>1.2.11.wso2v4</axiom.wso2.version> + <orbit.version.axis2>1.6.1.wso2v5</orbit.version.axis2> + <orbit.version.axiom>1.2.11.wso2v3</orbit.version.axiom> + <neethi.wso2.version>2.0.4.wso2v4</neethi.wso2.version> + <imp.pkg.version.javax.servlet>[2.6.0, 3.0.0)</imp.pkg.version.javax.servlet> + <rampart.wso2.version>1.6.1.wso2v9</rampart.wso2.version> <!--eclipse.osgi.version>3.5.0.v20090520</eclipse.osgi.version> <eclipse.osgi.services.version>3.2.0.v20090520-1800</eclipse.osgi.services.version> <equinox.commons.logging.version>1.0.4.v200706111724</equinox.commons.logging.version> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/features/messaging/org.apache.stratos.messaging.feature/pom.xml ---------------------------------------------------------------------- diff --git a/features/messaging/org.apache.stratos.messaging.feature/pom.xml b/features/messaging/org.apache.stratos.messaging.feature/pom.xml index 363654e..e154f16 100644 --- a/features/messaging/org.apache.stratos.messaging.feature/pom.xml +++ b/features/messaging/org.apache.stratos.messaging.feature/pom.xml @@ -69,6 +69,7 @@ <bundles> <bundleDef>org.apache.stratos:org.apache.stratos.messaging:${project.version}</bundleDef> <bundleDef>com.google.code.gson:gson:${gson2.version}</bundleDef> + <bundleDef>org.apache.commons:commons-lang3:3.1</bundleDef> <bundleDef>org.slf4j:slf4j-api:1.6.4</bundleDef> <bundleDef>org.slf4j:slf4j-log4j12:1.6.4</bundleDef> </bundles> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 13b1e6a..d1c6607 100644 --- a/pom.xml +++ b/pom.xml @@ -431,7 +431,7 @@ <wso2carbon.version>4.1.0</wso2carbon.version> <carbon.platform.package.import.version.range>[4.1.0, 4.2.0)</carbon.platform.package.import.version.range> <carbon.platform.package.export.version>4.1.0</carbon.platform.package.export.version> - <axis2.osgi.version>1.6.1.wso2v9</axis2.osgi.version> - <jclouds.version>1.6.2-incubating</jclouds.version> + <axis2.osgi.version>1.6.1.wso2v9</axis2.osgi.version> + <jclouds.version>1.6.2-incubating</jclouds.version> </properties> </project> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat index aa4ae5f..5fe0d9a 100644 --- a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat +++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat @@ -154,7 +154,7 @@ set CARBON_CLASSPATH=.\lib;%CARBON_CLASSPATH% set JAVA_ENDORSED=".\lib\endorsed";"%JAVA_HOME%\jre\lib\endorsed";"%JAVA_HOME%\lib\endorsed" -set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m -Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof -Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% -Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ -Dcarbon.home="%CARBON_HOME%" -Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" -Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" -Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" -Dcatalina.base="%CARBON_HOME%\lib\tomcat" -Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml -Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" -Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" -Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties" - Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" -Djndi.properties.dir="%CARBON_HOME%/repository/conf" -Dconf.location="%CARBON_HOME%\repository\conf" -Dcarbon.logs.path="%CARBON_HOME%\repository\logs" -Dcomponents.repo="%CARBON_HOME%\repository\components" -Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" -Dcom.atomikos.icatch.hide_init_file_path="true" -Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true -Dcom.sun.jndi.ldap.connect.pool.authentication=simple -Dcom.sun.jndi.ldap.connect.pool.timeout=3000 -Dorg.terracotta.quartz.skipUpdateCheck=true -Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 -Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost -Dthrift.receiver.port=7615 +set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m -Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof -Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% -Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ -Dcarbon.home="%CARBON_HOME%" -Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" -Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" -Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" -Dcatalina.base="%CARBON_HOME%\lib\tomcat" -Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml -Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" -Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" -Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties" - Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" -Djndi.properties.dir="%CARBON_HOME%/repository/conf" -Dconf.location="%CARBON_HOME%\repository\conf" -Dcarbon.logs.path="%CARBON_HOME%\repository\logs" -Dcomponents.repo="%CARBON_HOME%\repository\components" -Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" -Dcom.atomikos.icatch.hide_init_file_path="true" -Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true -Dcom.sun.jndi.ldap.connect.pool.authentication=simple -Dcom.sun.jndi.ldap.connect.pool.timeout=3000 -Dorg.terracotta.quartz.skipUpdateCheck=true -Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 -Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost -Dthrift.receiver.port=7615 -Dstratos.messaging.topology.service.filter="" :runJava echo JAVA_HOME environment variable is set to %JAVA_HOME% http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/c852a28a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh index 85dbd51..1e17dcc 100644 --- a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh +++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh @@ -298,6 +298,7 @@ exec "$JAVACMD" \ -Djavax.net.ssl.trustStorePassword=wso2carbon \ -Dthrift.receiver.ip=localhost \ -Dthrift.receiver.port=7615 \ + -Dstratos.messaging.topology.service.filter="" \ org.wso2.carbon.bootstrap.Bootstrap $*
