Repository: stratos Updated Branches: refs/heads/master 99182790b -> b7897af9c
Adding topology application filter Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b7897af9 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b7897af9 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b7897af9 Branch: refs/heads/master Commit: b7897af9c971dc51944ff5dacf6118a349f4a2bc Parents: 9918279 Author: Imesh Gunaratne <[email protected]> Authored: Tue Jun 30 07:12:08 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Tue Jun 30 07:12:33 2015 +0530 ---------------------------------------------------------------------- components/org.apache.stratos.common/pom.xml | 1 + .../common/constants/StratosConstants.java | 1 + .../topology/TopologyApplicationFilter.java | 98 ++++++++++++++++++++ .../filter/topology/TopologyClusterFilter.java | 8 +- .../filter/topology/TopologyMemberFilter.java | 29 +----- .../filter/topology/TopologyServiceFilter.java | 4 +- ...licationClustersCreatedMessageProcessor.java | 7 ++ .../ClusterCreatedMessageProcessor.java | 7 ++ .../ClusterInstanceActivatedProcessor.java | 7 ++ .../ClusterInstanceCreatedMessageProcessor.java | 7 ++ .../ClusterInstanceInactivateProcessor.java | 7 ++ .../ClusterInstanceTerminatedProcessor.java | 7 ++ .../ClusterInstanceTerminatingProcessor.java | 7 ++ .../ClusterRemovedMessageProcessor.java | 9 ++ .../topology/ClusterResetMessageProcessor.java | 7 ++ .../CompleteTopologyMessageProcessor.java | 10 +- .../MemberActivatedMessageProcessor.java | 7 ++ .../topology/MemberCreatedMessageProcessor.java | 7 ++ .../MemberInitializedMessageProcessor.java | 7 ++ .../MemberMaintenanceModeProcessor.java | 7 ++ .../MemberReadyToShutdownMessageProcessor.java | 7 ++ .../topology/MemberStartedMessageProcessor.java | 7 ++ .../MemberSuspendedMessageProcessor.java | 7 ++ .../MemberTerminatedMessageProcessor.java | 8 ++ .../messaging/test/MessageFilterTest.java | 19 ++-- 25 files changed, 252 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.common/pom.xml ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml index 316aef1..a957d68 100644 --- a/components/org.apache.stratos.common/pom.xml +++ b/components/org.apache.stratos.common/pom.xml @@ -47,6 +47,7 @@ <Bundle-Name>${project.artifactId}</Bundle-Name> <Export-Package> org.apache.stratos.common.*, + org.apache.stratos.common.constants.*, org.apache.stratos.common.domain.*, org.apache.stratos.common.client.*, org.apache.stratos.common.services.*, http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java index 194bd81..1275f5c 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java @@ -168,6 +168,7 @@ public class StratosConstants { public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout"; public static final String FILTER_VALUE_SEPARATOR = ","; + public static final String TOPOLOGY_APPLICATION_FILTER = "stratos.topology.application.filter"; public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter"; public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter"; public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter"; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java new file mode 100644 index 0000000..256bda6 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.filter.topology; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.message.filter.MessageFilter; + +import java.util.Collection; + +/** + * A filter to discard topology events which are not in a given application id list. + */ +public class TopologyApplicationFilter extends MessageFilter { + + private static final Log log = LogFactory.getLog(TopologyServiceFilter.class); + + public static final String TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID = "application-id"; + + private static volatile TopologyApplicationFilter instance; + + public TopologyApplicationFilter() { + super(StratosConstants.TOPOLOGY_APPLICATION_FILTER); + } + + /** + * Returns true if application is excluded else returns false. + * + * @param applicationId + * @return + */ + public static boolean apply(String applicationId) { + boolean excluded = false; + if (getInstance().isActive()) { + if (StringUtils.isNotBlank(applicationId) && getInstance().applicationExcluded(applicationId)) { + excluded = true; + } + if (excluded && log.isInfoEnabled()) { + log.info(String.format("Application is excluded: [application-id] %s", applicationId)); + } + } + return excluded; + } + + public static TopologyApplicationFilter getInstance() { + if (instance == null) { + synchronized (TopologyApplicationFilter.class) { + if (instance == null) { + instance = new TopologyApplicationFilter(); + if (log.isDebugEnabled()) { + log.debug("Topology application filter instance created"); + } + } + } + } + return instance; + } + + private boolean applicationExcluded(String value) { + return excluded(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID, value); + } + + private Collection<String> getIncludedApplicationIds() { + return getIncludedPropertyValues(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID + "="); + for (String applicationId : TopologyApplicationFilter.getInstance().getIncludedApplicationIds()) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(applicationId); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java index a34d072..350a142 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java @@ -22,6 +22,7 @@ package org.apache.stratos.messaging.message.filter.topology; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.message.filter.MessageFilter; import java.util.Collection; @@ -34,12 +35,11 @@ public class TopologyClusterFilter extends MessageFilter { private static final Log log = LogFactory.getLog(TopologyServiceFilter.class); public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = "cluster-id"; - public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter"; private static volatile TopologyClusterFilter instance; public TopologyClusterFilter() { - super(TOPOLOGY_CLUSTER_FILTER); + super(StratosConstants.TOPOLOGY_CLUSTER_FILTER); } /** @@ -54,8 +54,8 @@ public class TopologyClusterFilter extends MessageFilter { if (StringUtils.isNotBlank(clusterId) && getInstance().clusterExcluded(clusterId)) { excluded = true; } - if (excluded && log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", clusterId)); + if (excluded && log.isInfoEnabled()) { + log.info(String.format("Cluster is excluded: [cluster-id] %s", clusterId)); } } return excluded; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java index 5bdab83..32d7c25 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java @@ -23,6 +23,7 @@ package org.apache.stratos.messaging.message.filter.topology; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.message.filter.MessageFilter; import java.util.Collection; @@ -33,16 +34,13 @@ import java.util.Collection; public class TopologyMemberFilter extends MessageFilter { private static final Log log = LogFactory.getLog(TopologyServiceFilter.class); - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = "lb-cluster-id"; public static final String TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID = "network-partition-id"; - public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter"; private static volatile TopologyMemberFilter instance; public TopologyMemberFilter() { - super(TOPOLOGY_MEMBER_FILTER); + super(StratosConstants.TOPOLOGY_MEMBER_FILTER); } /** @@ -55,14 +53,11 @@ public class TopologyMemberFilter extends MessageFilter { public static boolean apply(String lbClusterId, String networkPartitionId) { boolean excluded = false; if (getInstance().isActive()) { - if (StringUtils.isNotBlank(lbClusterId) && getInstance().lbClusterIdExcluded(lbClusterId)) { - excluded = true; - } if (StringUtils.isNotBlank(networkPartitionId) && getInstance().networkPartitionExcluded(networkPartitionId)) { excluded = true; } - if (excluded && log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster] %s", lbClusterId)); + if (excluded && log.isInfoEnabled()) { + log.info(String.format("Member is excluded: [network-partition-id] %s", networkPartitionId)); } } return excluded; @@ -82,14 +77,6 @@ public class TopologyMemberFilter extends MessageFilter { return instance; } - private boolean lbClusterIdExcluded(String value) { - return excluded(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value); - } - - private Collection<String> getIncludedLbClusterIds() { - return getIncludedPropertyValues(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID); - } - private boolean networkPartitionExcluded(String value) { return excluded(TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID, value); } @@ -101,14 +88,6 @@ public class TopologyMemberFilter extends MessageFilter { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID + "="); - for (String clusterId : getInstance().getIncludedLbClusterIds()) { - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(clusterId); - } - sb.append(LINE_SEPARATOR); sb.append(TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "="); for (String networkPartitionId : getInstance().getIncludedNetworkPartitionIds()) { if (sb.length() > 0) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java index d74b6cc..63bb7b9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java @@ -54,8 +54,8 @@ public class TopologyServiceFilter extends MessageFilter { if (StringUtils.isNotBlank(serviceName) && getInstance().serviceExcluded(serviceName)) { excluded = true; } - if (excluded && log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [lb-cluster] %s", serviceName)); + if (excluded && log.isInfoEnabled()) { + log.info(String.format("Service is excluded: [service-name] %s", serviceName)); } } return excluded; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java index 374ab0f..dcae73e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -73,10 +74,16 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor List<Cluster> clusters = event.getClusterList(); for (Cluster cluster : clusters) { + String applicationId = cluster.getAppId(); String serviceName = cluster.getServiceName(); String clusterId = cluster.getClusterId(); TopologyUpdater.acquireWriteLockForService(serviceName); + try { + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + continue; + } // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java index 036fe19..0e14bcd 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -73,9 +74,15 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { private boolean doProcess(ClusterCreatedEvent event, Topology topology) { Cluster cluster = event.getCluster(); + String applicationId = cluster.getAppId(); String serviceName = cluster.getServiceName(); String clusterId = cluster.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java index 09d8d84..3d67d72 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.ClusterInstanceActivatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -80,9 +81,15 @@ public class ClusterInstanceActivatedProcessor extends MessageProcessor { private boolean doProcess(ClusterInstanceActivatedEvent event, Topology topology) { + String applicationId = event.getAppId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java index d7e4606..8fb28cb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterInstanceCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -97,8 +98,14 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor { } return false; } + Cluster cluster = service.getCluster(event.getClusterId()); + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + if (cluster == null) { if (log.isDebugEnabled()) { log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java index 319d71b..c86efe3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterInstanceInactivateEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -79,9 +80,15 @@ public class ClusterInstanceInactivateProcessor extends MessageProcessor { private boolean doProcess(ClusterInstanceInactivateEvent event, Topology topology) { + String applicationId = event.getAppId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java index 3a56440..e307bd8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterInstanceTerminatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -79,9 +80,15 @@ public class ClusterInstanceTerminatedProcessor extends MessageProcessor { private boolean doProcess(ClusterInstanceTerminatedEvent event, Topology topology) { + String applicationId = event.getAppId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java index 75b53b1..158d10f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterInstanceTerminatingEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -79,9 +80,15 @@ public class ClusterInstanceTerminatingProcessor extends MessageProcessor { private boolean doProcess(ClusterInstanceTerminatingEvent event, Topology topology) { + String applicationId = event.getAppId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java index 361c6d2..d38ada2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java @@ -20,9 +20,11 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -104,6 +106,13 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor { } } else { + Cluster cluster = service.getCluster(event.getClusterId()); + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + // Apply changes to the topology service.removeCluster(event.getClusterId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java index 1168776..dc94a2a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterResetEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -76,9 +77,15 @@ public class ClusterResetMessageProcessor extends MessageProcessor { private boolean doProcess(ClusterResetEvent event, Topology topology) { + String applicationId = event.getAppId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java index 0317378..6172654 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -88,13 +89,16 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor { topology.addService(service); } - // Apply cluster filter + // Apply application & cluster filters for (Service service : topology.getServices()) { List<Cluster> clustersToRemove = new ArrayList<Cluster>(); for (Cluster cluster : service.getClusters()) { - if (TopologyClusterFilter.apply(cluster.getClusterId())) { + if (TopologyApplicationFilter.apply(cluster.getAppId())) { clustersToRemove.add(cluster); - } else { + } else if (TopologyClusterFilter.apply(cluster.getClusterId())) { + clustersToRemove.add(cluster); + } + else { // Add non filtered clusters to clusterId-cluster map if (!topology.clusterExist(cluster.getClusterId())) { topology.addToCluterMap(cluster); http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java index 99ca9b8..6e1d9d8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -71,9 +72,15 @@ public class MemberActivatedMessageProcessor extends MessageProcessor { private boolean doProcess(MemberActivatedEvent event, Topology topology) { + String applicationId = event.getApplicationId(); String serviceName = event.getServiceName(); String clusterId = event.getClusterId(); + // Apply application filter + if(TopologyApplicationFilter.apply(applicationId)) { + return false; + } + // Apply service filter if (TopologyServiceFilter.apply(serviceName)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java index 3669e6c..d6bfd4d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberCreatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; @@ -103,6 +104,12 @@ public class MemberCreatedMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + if (cluster.memberExists(event.getMemberId())) { if (log.isDebugEnabled()) { log.debug(String.format("Member already exists: [service] %s [cluster] %s [member] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java index bf3bb97..57d0680 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -99,6 +100,12 @@ public class MemberInitializedMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member == null) { if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java index b0f13b5..f0b53ff 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -102,6 +103,12 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member == null) { if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java index 067d729..628123f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -101,6 +102,12 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member == null) { if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java index 51345e9..990d337 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -101,6 +102,12 @@ public class MemberStartedMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member == null) { if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java index da56970..a8b3ac9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -102,6 +103,12 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member == null) { if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java index 468be6e..ee33269 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -95,6 +96,7 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor { } return false; } + Cluster cluster = service.getCluster(event.getClusterId()); if (cluster == null) { if (log.isWarnEnabled()) { @@ -102,6 +104,12 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor { } return false; } + + // Apply application filter + if(TopologyApplicationFilter.apply(cluster.getAppId())) { + return false; + } + Member member = cluster.getMember(event.getMemberId()); if (member != null) { // Apply member filter http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java index 2ee08df..dd901fe 100755 --- a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java +++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java @@ -20,6 +20,7 @@ package org.apache.stratos.messaging.test; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.message.filter.MessageFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -104,12 +105,7 @@ public class MessageFilterTest { @Test public final void testMemberFilter() { System.setProperty(StratosConstants.TOPOLOGY_MEMBER_FILTER, - TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID + "=lb-cluster1,lb-cluster2|" + - TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "=np1,np2"); - - assertFalse(TopologyMemberFilter.apply("lb-cluster1", null)); - assertFalse(TopologyMemberFilter.apply("lb-cluster2", null)); - assertTrue(TopologyMemberFilter.apply("lb-cluster3", null)); + TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "=np1,np2"); assertFalse(TopologyMemberFilter.apply(null, "np1")); assertFalse(TopologyMemberFilter.apply(null, "np2")); @@ -119,4 +115,15 @@ public class MessageFilterTest { assertFalse(TopologyMemberFilter.apply("lb-cluster2", "np2")); assertTrue(TopologyMemberFilter.apply("lb-cluster3", "np3")); } + + @Test + public final void testApplicationFilter() { + System.setProperty(StratosConstants.TOPOLOGY_APPLICATION_FILTER, + TopologyApplicationFilter.TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID + + "=application-1,application-2"); + + assertFalse(TopologyApplicationFilter.apply("application-1")); + assertFalse(TopologyApplicationFilter.apply("application-2")); + assertTrue(TopologyApplicationFilter.apply("application-3")); + } }
