Repository: stratos Updated Branches: refs/heads/master 790437ee7 -> b53839336
Introducing messaging package in load balancer component Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b5383933 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b5383933 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b5383933 Branch: refs/heads/master Commit: b5383933663cc2fa865dad86baf74d24edff8b70 Parents: 790437e Author: Imesh Gunaratne <[email protected]> Authored: Thu Nov 27 14:41:45 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Nov 27 14:41:45 2014 +0530 ---------------------------------------------------------------------- .../LoadBalancerTenantEventReceiver.java | 219 ---------- .../LoadBalancerTopologyEventReceiver.java | 421 ------------------- .../stratos/load/balancer/RequestDelegator.java | 138 ------ .../balancer/endpoint/RequestDelegator.java | 138 ++++++ .../TenantAwareLoadBalanceEndpoint.java | 1 - .../internal/LoadBalancerServiceComponent.java | 4 +- .../LoadBalancerTenantEventReceiver.java | 219 ++++++++++ .../LoadBalancerTopologyEventReceiver.java | 421 +++++++++++++++++++ 8 files changed, 780 insertions(+), 781 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java deleted file mode 100644 index 058553b..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantEventReceiver.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; -import org.apache.stratos.messaging.domain.tenant.Subscription; -import org.apache.stratos.messaging.domain.tenant.SubscriptionDomain; -import org.apache.stratos.messaging.domain.tenant.Tenant; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.domain.topology.ServiceType; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.tenant.*; -import org.apache.stratos.messaging.listener.tenant.*; -import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -/** - * Load balancer tenant receiver updates load balancer context according to - * incoming tenant events. - */ -public class LoadBalancerTenantEventReceiver implements Runnable { - - private static final Log log = LogFactory.getLog(LoadBalancerTenantEventReceiver.class); - - private final TenantEventReceiver tenantEventReceiver; - private boolean terminated; - - public LoadBalancerTenantEventReceiver() { - tenantEventReceiver = new TenantEventReceiver(); - addEventListeners(); - } - - private void addEventListeners() { - tenantEventReceiver.addEventListener(new CompleteTenantEventListener() { - private boolean initialized; - - @Override - protected void onEvent(Event event) { - if (!initialized) { - CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event; - if (log.isDebugEnabled()) { - log.debug("Complete tenant event received"); - } - for (Tenant tenant : completeTenantEvent.getTenants()) { - for (Subscription subscription : tenant.getSubscriptions()) { - if (isMultiTenantService(subscription.getServiceName())) { - LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds( - subscription.getServiceName(), - tenant.getTenantId(), - subscription.getClusterIds()); - } - - for (SubscriptionDomain subscriptionDomain : subscription.getSubscriptionDomains()) { - LoadBalancerContextUtil.addClustersAgainstDomain( - subscription.getServiceName(), - subscription.getClusterIds(), - subscriptionDomain.getDomainName()); - - LoadBalancerContextUtil.addAppContextAgainstDomain(subscriptionDomain.getDomainName(), - subscriptionDomain.getApplicationContext()); - } - } - } - initialized = true; - } - } - }); - tenantEventReceiver.addEventListener(new TenantSubscribedEventListener() { - @Override - protected void onEvent(Event event) { - TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event; - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s", - tenantSubscribedEvent.getTenantId(), - tenantSubscribedEvent.getServiceName(), - tenantSubscribedEvent.getClusterIds())); - } - - if (isMultiTenantService(tenantSubscribedEvent.getServiceName())) { - LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds( - tenantSubscribedEvent.getServiceName(), - tenantSubscribedEvent.getTenantId(), - tenantSubscribedEvent.getClusterIds()); - } - } - }); - tenantEventReceiver.addEventListener(new TenantUnSubscribedEventListener() { - @Override - protected void onEvent(Event event) { - TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event; - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant un-subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s", - tenantUnSubscribedEvent.getTenantId(), - tenantUnSubscribedEvent.getServiceName(), - tenantUnSubscribedEvent.getClusterIds())); - } - - if (isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) { - LoadBalancerContextUtil.removeClustersAgainstHostNamesAndTenantIds( - tenantUnSubscribedEvent.getServiceName(), - tenantUnSubscribedEvent.getTenantId(), - tenantUnSubscribedEvent.getClusterIds() - ); - } - - LoadBalancerContextUtil.removeClustersAgainstAllDomains( - tenantUnSubscribedEvent.getServiceName(), - tenantUnSubscribedEvent.getTenantId(), - tenantUnSubscribedEvent.getClusterIds()); - - LoadBalancerContextUtil.removeAppContextAgainstAllDomains( - tenantUnSubscribedEvent.getServiceName(), - tenantUnSubscribedEvent.getTenantId()); - } - }); - tenantEventReceiver.addEventListener(new SubscriptionDomainsAddedEventListener() { - @Override - protected void onEvent(Event event) { - SubscriptionDomainAddedEvent subscriptionDomainAddedEvent = (SubscriptionDomainAddedEvent) event; - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant subscription domain added event received: [tenant-id] %d " + - "[service] %s [cluster-ids] %s [domain-name] %s", - subscriptionDomainAddedEvent.getTenantId(), - subscriptionDomainAddedEvent.getServiceName(), - subscriptionDomainAddedEvent.getClusterIds(), - subscriptionDomainAddedEvent.getDomainName())); - } - - LoadBalancerContextUtil.addClustersAgainstDomain( - subscriptionDomainAddedEvent.getServiceName(), - subscriptionDomainAddedEvent.getClusterIds(), - subscriptionDomainAddedEvent.getDomainName()); - - LoadBalancerContextUtil.addAppContextAgainstDomain( - subscriptionDomainAddedEvent.getDomainName(), - subscriptionDomainAddedEvent.getApplicationContext()); - } - }); - tenantEventReceiver.addEventListener(new SubscriptionDomainsRemovedEventListener() { - @Override - protected void onEvent(Event event) { - SubscriptionDomainRemovedEvent subscriptionDomainRemovedEvent = (SubscriptionDomainRemovedEvent) event; - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant subscription domain removed event received: [tenant-id] %d " + - "[service] %s [cluster-ids] %s [domain-name] %s", - subscriptionDomainRemovedEvent.getTenantId(), - subscriptionDomainRemovedEvent.getServiceName(), - subscriptionDomainRemovedEvent.getClusterIds(), - subscriptionDomainRemovedEvent.getDomainName())); - } - - LoadBalancerContextUtil.removeClustersAgainstDomain( - subscriptionDomainRemovedEvent.getServiceName(), - subscriptionDomainRemovedEvent.getClusterIds(), - subscriptionDomainRemovedEvent.getDomainName()); - - LoadBalancerContextUtil.removeAppContextAgainstDomain( - subscriptionDomainRemovedEvent.getDomainName()); - } - }); - } - - private boolean isMultiTenantService(String serviceName) { - try { - TopologyManager.acquireReadLock(); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - return (service.getServiceType() == ServiceType.MultiTenant); - } - return false; - } finally { - TopologyManager.releaseReadLock(); - } - } - - @Override - public void run() { - Thread tenantReceiverThread = new Thread(tenantEventReceiver); - tenantReceiverThread.start(); - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if (log.isInfoEnabled()) { - log.info("Load balancer tenant receiver thread terminated"); - } - } - - /** - * Terminate load balancer tenant receiver thread. - */ - public void terminate() { - tenantEventReceiver.terminate(); - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java deleted file mode 100644 index 053de8b..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.context.LoadBalancerContext; -import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; -import org.apache.stratos.load.balancer.context.map.AlgorithmContextMap; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.MemberStatus; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.messaging.listener.topology.*; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -/** - * Load balancer topology receiver updates load balancer context according to - * incoming topology events. - */ -public class LoadBalancerTopologyEventReceiver implements Runnable { - - private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class); - - private TopologyEventReceiver topologyEventReceiver; - private boolean terminated; - - public LoadBalancerTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); - addEventListeners(); - } - - @Override - public void run() { - Thread thread = new Thread(topologyEventReceiver); - thread.start(); - if (log.isInfoEnabled()) { - log.info("Load balancer topology receiver thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if (log.isInfoEnabled()) { - log.info("Load balancer topology receiver thread terminated"); - } - } - - private void addEventListeners() { - // Listen to topology events that affect clusters - topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { - private boolean initialized; - - @Override - protected void onEvent(Event event) { - if(!initialized) { - try { - TopologyManager.acquireReadLock(); - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - if (clusterHasActiveMembers(cluster)) { - LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); - } else { - if (log.isDebugEnabled()) { - log.debug("Cluster does not have any active members"); - } - } - for (Member member : cluster.getMembers()) { - if (member.getStatus() == MemberStatus.Activated) { - addMemberIpsToMemberIpHostnameMap(cluster, member); - } - - } - } - } - initialized = true; - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); - } - } - } - - private boolean clusterHasActiveMembers(Cluster cluster) { - for (Member member : cluster.getMembers()) { - if (member.getStatus() == MemberStatus.Activated) { - return true; - } - } - return false; - } - }); - topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - - //TopologyManager.acquireReadLock(); - TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(), - memberActivatedEvent.getClusterId()); - - try { - - Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service not found in topology: [service] %s", - memberActivatedEvent.getServiceName())); - } - return; - } - Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId())); - } - return; - } - Member member = cluster.getMember(memberActivatedEvent.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId(), - memberActivatedEvent.getMemberId())); - } - return; - } - - // Add member to member-ip -> hostname map - addMemberIpsToMemberIpHostnameMap(cluster, member); - - if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster( - member.getClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster already exists in load balancer context: [service] %s " + - "[cluster] %s", member.getServiceName(), member.getClusterId())); - } - // At this point member is already added to the cluster object in load balancer context - return; - } - - // Add cluster to load balancer context when its first member is activated: - // Cluster not found in load balancer context, add it - LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - //TopologyManager.releaseReadLock(); - TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), - memberActivatedEvent.getClusterId()); - } - } - }); - topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { - @Override - protected void onEvent(Event event) { - - MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; - - TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId()); - - try { - //TopologyManager.acquireReadLock(); - - Member member = findMember(memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId()); - - if (member != null) { - removeMemberIpsFromMemberIpHostnameMap(member); - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - //TopologyManager.releaseReadLock(); - TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId()); - } - } - }); - topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { - @Override - protected void onEvent(Event event) { - - MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; - TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(), - memberSuspendedEvent.getClusterId()); - - try { - //TopologyManager.acquireReadLock(); - Member member = findMember(memberSuspendedEvent.getServiceName(), - memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId()); - - if (member != null) { - removeMemberIpsFromMemberIpHostnameMap(member); - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - //TopologyManager.releaseReadLock(); - TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(), - memberSuspendedEvent.getClusterId()); - } - } - }); - topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - - //TopologyManager.acquireReadLock(); - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - - TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), - memberTerminatedEvent.getClusterId()); - - try { - Member member = findMember(memberTerminatedEvent.getServiceName(), - memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId()); - - if (member != null) { - removeMemberIpsFromMemberIpHostnameMap(member); - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - //TopologyManager.releaseReadLock(); - TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), - memberTerminatedEvent.getClusterId()); - } - } - }); - topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { - @Override - protected void onEvent(Event event) { - - // Remove cluster from context - ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; - TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(), - clusterRemovedEvent.getClusterId()); - - try { - AlgorithmContextMap.getInstance().removeCluster(clusterRemovedEvent.getServiceName(), - clusterRemovedEvent.getClusterId()); - } catch (Exception e) { - log.error("Could not remove cluster from load balancer algorithm context map", e); - } - - try { - Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId()); - if (cluster != null) { - for (Member member : cluster.getMembers()) { - removeMemberIpsFromMemberIpHostnameMap(member); - } - LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); - } else { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s", - clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId())); - } - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(), - clusterRemovedEvent.getClusterId()); - } - } - }); - topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { - @Override - protected void onEvent(Event event) { - - ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event; - TopologyManager.acquireReadLockForService(serviceRemovedEvent.getServiceName()); - - try { - //TopologyManager.acquireReadLock(); - - // Remove all clusters of given service from context - Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName()); - if (service != null) { - for (Cluster cluster : service.getClusters()) { - for (Member member : cluster.getMembers()) { - removeMemberIpsFromMemberIpHostnameMap(member); - } - LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); - } - } else { - if (log.isWarnEnabled()) { - log.warn(String.format("Service not found in topology: [service] %s", - serviceRemovedEvent.getServiceName())); - } - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - //TopologyManager.releaseReadLock(); - TopologyManager.releaseReadLockForService(serviceRemovedEvent.getServiceName()); - } - } - }); - } - - private Member findMember(String serviceName, String clusterId, String memberId) { - Service service = TopologyManager.getTopology().getService(serviceName); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service not found in topology: [service] %s", serviceName)); - } - return null; - } - - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", serviceName, clusterId)); - } - return null; - } - - Member member = cluster.getMember(memberId); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", serviceName, - clusterId, memberId)); - } - return null; - } - return member; - } - - private void addMemberIpsToMemberIpHostnameMap(Cluster cluster, Member member) { - if ((cluster.getHostNames() == null) || (cluster.getHostNames().size() == 0)) { - if (log.isWarnEnabled()) { - log.warn(String.format("Hostnames not found in cluster %s, could not add member ips to member-ip " + - "-> hostname map", member.getClusterId())); - } - return; - } - - String hostname = cluster.getHostNames().get(0); - if (cluster.getHostNames().size() > 1) { - if (log.isWarnEnabled()) { - log.warn(String.format("Multiple hostnames found in cluster %s, using %s", - cluster.getHostNames().toString(), hostname)); - } - } - - if (StringUtils.isNotBlank(member.getMemberIp())) { - LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberIp(), hostname); - if (log.isDebugEnabled()) { - log.debug(String.format("Member private ip added to member-ip -> hostname map: [service] %s [cluster] " + - "%s [member] %s [private-ip] %s", member.getServiceName(), member.getClusterId(), - member.getMemberId(), member.getMemberIp() - )); - } - } - if (StringUtils.isNotBlank(member.getMemberPublicIp())) { - LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberPublicIp(), hostname); - if (log.isDebugEnabled()) { - log.debug(String.format("Member public ip added to member-ip -> hostname map: [service] %s [cluster] " + - "%s [member] %s [public-ip] %s", member.getServiceName(), member.getClusterId(), - member.getMemberId(), member.getMemberPublicIp() - )); - } - } - } - - private void removeMemberIpsFromMemberIpHostnameMap(Member member) { - if (StringUtils.isNotBlank(member.getMemberIp())) { - LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberIp()); - if (log.isDebugEnabled()) { - log.debug(String.format("Member private ip removed from member-ip -> hostname map: [private-ip] %s", - member.getMemberIp())); - } - } - if (StringUtils.isNotBlank(member.getMemberPublicIp())) { - LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberPublicIp()); - if (log.isDebugEnabled()) { - log.debug(String.format("Member public ip removed from member-ip -> hostname map: [public-ip] %s", - member.getMemberPublicIp())); - } - } - } - - /** - * Terminate load balancer topology receiver thread. - */ - public void terminate() { - topologyEventReceiver.terminate(); - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java deleted file mode 100644 index 46de2cc..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.context.AlgorithmContext; -import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm; -import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; -import org.apache.stratos.load.balancer.context.ClusterContext; -import org.apache.stratos.load.balancer.context.LoadBalancerContext; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; - -import java.util.ArrayList; - -/** - * Implements core load balancing logic for identifying the next member - * according to the incoming request information. - */ -public class RequestDelegator { - private static final Log log = LogFactory.getLog(RequestDelegator.class); - - private LoadBalanceAlgorithm algorithm; - - public RequestDelegator(LoadBalanceAlgorithm algorithm) { - this.algorithm = algorithm; - } - - public Member findNextMemberFromHostName(String hostName, String messageId) { - if (hostName == null) - return null; - - long startTime = System.currentTimeMillis(); - - Cluster cluster = LoadBalancerContext.getInstance().getHostNameClusterMap().getCluster(hostName); - if (cluster != null) { - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster %s identified for request %s", cluster.getClusterId(), messageId)); - } - Member member = findNextMemberInCluster(cluster); - if (member != null) { - if (log.isDebugEnabled()) { - long endTime = System.currentTimeMillis(); - log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [member] %s [request-id] %s", - (endTime - startTime), member.getServiceName(), member.getClusterId(), member.getMemberId(), messageId)); - } - } - return member; - } - else { - if(log.isWarnEnabled()) { - log.warn(String.format("Could not find a cluster for hostname %s", hostName)); - } - } - return null; - } - - public Member findNextMemberFromTenantId(String hostName, int tenantId) { - long startTime = System.currentTimeMillis(); - - // Find cluster from host name and tenant id - Cluster cluster = LoadBalancerContext.getInstance().getMultiTenantClusterMap().getCluster(hostName, tenantId); - if (cluster != null) { - Member member = findNextMemberInCluster(cluster); - if (member != null) { - if (log.isDebugEnabled()) { - long endTime = System.currentTimeMillis(); - log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [tenant-id] %d [member] %s", - (endTime - startTime), member.getServiceName(), member.getClusterId(), tenantId, member.getMemberId())); - } - } - return member; - } - else { - if(log.isWarnEnabled()) { - log.warn(String.format("Could not find a cluster for hostname %s and tenant-id %d", hostName, tenantId)); - } - } - return null; - } - - /** - * This operation should be synchronized in order to find a member - * correctly. This has no performance impact as per the load tests - * carried out. - */ - private synchronized Member findNextMemberInCluster(Cluster cluster) { - // Find algorithm context of the cluster - ClusterContext clusterContext = LoadBalancerContext.getInstance().getClusterIdClusterContextMap().getClusterContext(cluster.getClusterId()); - if (clusterContext == null) { - clusterContext = new ClusterContext(cluster.getServiceName(), cluster.getClusterId()); - LoadBalancerContext.getInstance().getClusterIdClusterContextMap().addClusterContext(clusterContext); - } - - AlgorithmContext algorithmContext = clusterContext.getAlgorithmContext(); - if (algorithmContext == null) { - algorithmContext = new AlgorithmContext(cluster.getServiceName(), cluster.getClusterId()); - clusterContext.setAlgorithmContext(algorithmContext); - } - algorithm.setMembers(new ArrayList<Member>(cluster.getMembers())); - Member member = algorithm.getNextMember(algorithmContext); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Could not find a member in cluster: [service] %s [cluster] %s", cluster.getServiceName(), cluster.getClusterId())); - } - } - return member; - } - - public boolean isTargetHostValid(String hostName) { - if (hostName == null) - return false; - - boolean valid = LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster(hostName); - if ((!valid) && (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled())) { - valid = LoadBalancerContext.getInstance().getMultiTenantClusterMap().containsClusters(hostName); - } - return valid; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/RequestDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/RequestDelegator.java new file mode 100644 index 0000000..ac94654 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/RequestDelegator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.endpoint; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.context.AlgorithmContext; +import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm; +import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; +import org.apache.stratos.load.balancer.context.ClusterContext; +import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; + +import java.util.ArrayList; + +/** + * Implements core load balancing logic for identifying the next member + * according to the incoming request information. + */ +public class RequestDelegator { + private static final Log log = LogFactory.getLog(RequestDelegator.class); + + private LoadBalanceAlgorithm algorithm; + + public RequestDelegator(LoadBalanceAlgorithm algorithm) { + this.algorithm = algorithm; + } + + public Member findNextMemberFromHostName(String hostName, String messageId) { + if (hostName == null) + return null; + + long startTime = System.currentTimeMillis(); + + Cluster cluster = LoadBalancerContext.getInstance().getHostNameClusterMap().getCluster(hostName); + if (cluster != null) { + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster %s identified for request %s", cluster.getClusterId(), messageId)); + } + Member member = findNextMemberInCluster(cluster); + if (member != null) { + if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [member] %s [request-id] %s", + (endTime - startTime), member.getServiceName(), member.getClusterId(), member.getMemberId(), messageId)); + } + } + return member; + } + else { + if(log.isWarnEnabled()) { + log.warn(String.format("Could not find a cluster for hostname %s", hostName)); + } + } + return null; + } + + public Member findNextMemberFromTenantId(String hostName, int tenantId) { + long startTime = System.currentTimeMillis(); + + // Find cluster from host name and tenant id + Cluster cluster = LoadBalancerContext.getInstance().getMultiTenantClusterMap().getCluster(hostName, tenantId); + if (cluster != null) { + Member member = findNextMemberInCluster(cluster); + if (member != null) { + if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [tenant-id] %d [member] %s", + (endTime - startTime), member.getServiceName(), member.getClusterId(), tenantId, member.getMemberId())); + } + } + return member; + } + else { + if(log.isWarnEnabled()) { + log.warn(String.format("Could not find a cluster for hostname %s and tenant-id %d", hostName, tenantId)); + } + } + return null; + } + + /** + * This operation should be synchronized in order to find a member + * correctly. This has no performance impact as per the load tests + * carried out. + */ + private synchronized Member findNextMemberInCluster(Cluster cluster) { + // Find algorithm context of the cluster + ClusterContext clusterContext = LoadBalancerContext.getInstance().getClusterIdClusterContextMap().getClusterContext(cluster.getClusterId()); + if (clusterContext == null) { + clusterContext = new ClusterContext(cluster.getServiceName(), cluster.getClusterId()); + LoadBalancerContext.getInstance().getClusterIdClusterContextMap().addClusterContext(clusterContext); + } + + AlgorithmContext algorithmContext = clusterContext.getAlgorithmContext(); + if (algorithmContext == null) { + algorithmContext = new AlgorithmContext(cluster.getServiceName(), cluster.getClusterId()); + clusterContext.setAlgorithmContext(algorithmContext); + } + algorithm.setMembers(new ArrayList<Member>(cluster.getMembers())); + Member member = algorithm.getNextMember(algorithmContext); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Could not find a member in cluster: [service] %s [cluster] %s", cluster.getServiceName(), cluster.getClusterId())); + } + } + return member; + } + + public boolean isTargetHostValid(String hostName) { + if (hostName == null) + return false; + + boolean valid = LoadBalancerContext.getInstance().getHostNameClusterMap().containsCluster(hostName); + if ((!valid) && (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled())) { + valid = LoadBalancerContext.getInstance().getMultiTenantClusterMap().containsClusters(hostName); + } + return valid; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index b77cf7c..83002c9 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -23,7 +23,6 @@ import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.description.TransportInDescription; import org.apache.commons.lang3.StringUtils; import org.apache.http.protocol.HTTP; -import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.MemberIpType; http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 9e68773..e94336a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -24,8 +24,8 @@ import org.apache.axis2.engine.AxisConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.endpoint.EndpointDeployer; -import org.apache.stratos.load.balancer.LoadBalancerTenantEventReceiver; -import org.apache.stratos.load.balancer.LoadBalancerTopologyEventReceiver; +import org.apache.stratos.load.balancer.messaging.LoadBalancerTenantEventReceiver; +import org.apache.stratos.load.balancer.messaging.LoadBalancerTopologyEventReceiver; import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier; http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java new file mode 100644 index 0000000..04e9d5f --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.messaging; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; +import org.apache.stratos.messaging.domain.tenant.Subscription; +import org.apache.stratos.messaging.domain.tenant.SubscriptionDomain; +import org.apache.stratos.messaging.domain.tenant.Tenant; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.ServiceType; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.tenant.*; +import org.apache.stratos.messaging.listener.tenant.*; +import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Load balancer tenant receiver updates load balancer context according to + * incoming tenant events. + */ +public class LoadBalancerTenantEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(LoadBalancerTenantEventReceiver.class); + + private final TenantEventReceiver tenantEventReceiver; + private boolean terminated; + + public LoadBalancerTenantEventReceiver() { + tenantEventReceiver = new TenantEventReceiver(); + addEventListeners(); + } + + private void addEventListeners() { + tenantEventReceiver.addEventListener(new CompleteTenantEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event; + if (log.isDebugEnabled()) { + log.debug("Complete tenant event received"); + } + for (Tenant tenant : completeTenantEvent.getTenants()) { + for (Subscription subscription : tenant.getSubscriptions()) { + if (isMultiTenantService(subscription.getServiceName())) { + LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds( + subscription.getServiceName(), + tenant.getTenantId(), + subscription.getClusterIds()); + } + + for (SubscriptionDomain subscriptionDomain : subscription.getSubscriptionDomains()) { + LoadBalancerContextUtil.addClustersAgainstDomain( + subscription.getServiceName(), + subscription.getClusterIds(), + subscriptionDomain.getDomainName()); + + LoadBalancerContextUtil.addAppContextAgainstDomain(subscriptionDomain.getDomainName(), + subscriptionDomain.getApplicationContext()); + } + } + } + initialized = true; + } + } + }); + tenantEventReceiver.addEventListener(new TenantSubscribedEventListener() { + @Override + protected void onEvent(Event event) { + TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event; + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s", + tenantSubscribedEvent.getTenantId(), + tenantSubscribedEvent.getServiceName(), + tenantSubscribedEvent.getClusterIds())); + } + + if (isMultiTenantService(tenantSubscribedEvent.getServiceName())) { + LoadBalancerContextUtil.addClustersAgainstHostNamesAndTenantIds( + tenantSubscribedEvent.getServiceName(), + tenantSubscribedEvent.getTenantId(), + tenantSubscribedEvent.getClusterIds()); + } + } + }); + tenantEventReceiver.addEventListener(new TenantUnSubscribedEventListener() { + @Override + protected void onEvent(Event event) { + TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event; + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant un-subscribed event received: [tenant-id] %d [service] %s [cluster-ids] %s", + tenantUnSubscribedEvent.getTenantId(), + tenantUnSubscribedEvent.getServiceName(), + tenantUnSubscribedEvent.getClusterIds())); + } + + if (isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) { + LoadBalancerContextUtil.removeClustersAgainstHostNamesAndTenantIds( + tenantUnSubscribedEvent.getServiceName(), + tenantUnSubscribedEvent.getTenantId(), + tenantUnSubscribedEvent.getClusterIds() + ); + } + + LoadBalancerContextUtil.removeClustersAgainstAllDomains( + tenantUnSubscribedEvent.getServiceName(), + tenantUnSubscribedEvent.getTenantId(), + tenantUnSubscribedEvent.getClusterIds()); + + LoadBalancerContextUtil.removeAppContextAgainstAllDomains( + tenantUnSubscribedEvent.getServiceName(), + tenantUnSubscribedEvent.getTenantId()); + } + }); + tenantEventReceiver.addEventListener(new SubscriptionDomainsAddedEventListener() { + @Override + protected void onEvent(Event event) { + SubscriptionDomainAddedEvent subscriptionDomainAddedEvent = (SubscriptionDomainAddedEvent) event; + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant subscription domain added event received: [tenant-id] %d " + + "[service] %s [cluster-ids] %s [domain-name] %s", + subscriptionDomainAddedEvent.getTenantId(), + subscriptionDomainAddedEvent.getServiceName(), + subscriptionDomainAddedEvent.getClusterIds(), + subscriptionDomainAddedEvent.getDomainName())); + } + + LoadBalancerContextUtil.addClustersAgainstDomain( + subscriptionDomainAddedEvent.getServiceName(), + subscriptionDomainAddedEvent.getClusterIds(), + subscriptionDomainAddedEvent.getDomainName()); + + LoadBalancerContextUtil.addAppContextAgainstDomain( + subscriptionDomainAddedEvent.getDomainName(), + subscriptionDomainAddedEvent.getApplicationContext()); + } + }); + tenantEventReceiver.addEventListener(new SubscriptionDomainsRemovedEventListener() { + @Override + protected void onEvent(Event event) { + SubscriptionDomainRemovedEvent subscriptionDomainRemovedEvent = (SubscriptionDomainRemovedEvent) event; + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant subscription domain removed event received: [tenant-id] %d " + + "[service] %s [cluster-ids] %s [domain-name] %s", + subscriptionDomainRemovedEvent.getTenantId(), + subscriptionDomainRemovedEvent.getServiceName(), + subscriptionDomainRemovedEvent.getClusterIds(), + subscriptionDomainRemovedEvent.getDomainName())); + } + + LoadBalancerContextUtil.removeClustersAgainstDomain( + subscriptionDomainRemovedEvent.getServiceName(), + subscriptionDomainRemovedEvent.getClusterIds(), + subscriptionDomainRemovedEvent.getDomainName()); + + LoadBalancerContextUtil.removeAppContextAgainstDomain( + subscriptionDomainRemovedEvent.getDomainName()); + } + }); + } + + private boolean isMultiTenantService(String serviceName) { + try { + TopologyManager.acquireReadLock(); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + return (service.getServiceType() == ServiceType.MultiTenant); + } + return false; + } finally { + TopologyManager.releaseReadLock(); + } + } + + @Override + public void run() { + Thread tenantReceiverThread = new Thread(tenantEventReceiver); + tenantReceiverThread.start(); + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Load balancer tenant receiver thread terminated"); + } + } + + /** + * Terminate load balancer tenant receiver thread. + */ + public void terminate() { + tenantEventReceiver.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b5383933/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java new file mode 100644 index 0000000..c50a2d2 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.messaging; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; +import org.apache.stratos.load.balancer.context.map.AlgorithmContextMap; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Load balancer topology receiver updates load balancer context according to + * incoming topology events. + */ +public class LoadBalancerTopologyEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class); + + private TopologyEventReceiver topologyEventReceiver; + private boolean terminated; + + public LoadBalancerTopologyEventReceiver() { + this.topologyEventReceiver = new TopologyEventReceiver(); + addEventListeners(); + } + + @Override + public void run() { + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("Load balancer topology receiver thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Load balancer topology receiver thread terminated"); + } + } + + private void addEventListeners() { + // Listen to topology events that affect clusters + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if(!initialized) { + try { + TopologyManager.acquireReadLock(); + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (clusterHasActiveMembers(cluster)) { + LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); + } else { + if (log.isDebugEnabled()) { + log.debug("Cluster does not have any active members"); + } + } + for (Member member : cluster.getMembers()) { + if (member.getStatus() == MemberStatus.Activated) { + addMemberIpsToMemberIpHostnameMap(cluster, member); + } + + } + } + } + initialized = true; + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + + private boolean clusterHasActiveMembers(Cluster cluster) { + for (Member member : cluster.getMembers()) { + if (member.getStatus() == MemberStatus.Activated) { + return true; + } + } + return false; + } + }); + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); + + try { + + Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service not found in topology: [service] %s", + memberActivatedEvent.getServiceName())); + } + return; + } + Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", + memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId())); + } + return; + } + Member member = cluster.getMember(memberActivatedEvent.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", + memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId(), + memberActivatedEvent.getMemberId())); + } + return; + } + + // Add member to member-ip -> hostname map + addMemberIpsToMemberIpHostnameMap(cluster, member); + + if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster( + member.getClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster already exists in load balancer context: [service] %s " + + "[cluster] %s", member.getServiceName(), member.getClusterId())); + } + // At this point member is already added to the cluster object in load balancer context + return; + } + + // Add cluster to load balancer context when its first member is activated: + // Cluster not found in load balancer context, add it + LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); + } + } + }); + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { + @Override + protected void onEvent(Event event) { + + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + + TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); + + try { + //TopologyManager.acquireReadLock(); + + Member member = findMember(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); + } + } + }); + topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; + TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); + + try { + //TopologyManager.acquireReadLock(); + Member member = findMember(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); + } + } + }); + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + + //TopologyManager.acquireReadLock(); + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + + TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); + + try { + Member member = findMember(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); + } + } + }); + topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { + @Override + protected void onEvent(Event event) { + + // Remove cluster from context + ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; + TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); + + try { + AlgorithmContextMap.getInstance().removeCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); + } catch (Exception e) { + log.error("Could not remove cluster from load balancer algorithm context map", e); + } + + try { + Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId()); + if (cluster != null) { + for (Member member : cluster.getMembers()) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); + } else { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s", + clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId())); + } + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); + } + } + }); + topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { + @Override + protected void onEvent(Event event) { + + ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event; + TopologyManager.acquireReadLockForService(serviceRemovedEvent.getServiceName()); + + try { + //TopologyManager.acquireReadLock(); + + // Remove all clusters of given service from context + Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName()); + if (service != null) { + for (Cluster cluster : service.getClusters()) { + for (Member member : cluster.getMembers()) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); + } + } else { + if (log.isWarnEnabled()) { + log.warn(String.format("Service not found in topology: [service] %s", + serviceRemovedEvent.getServiceName())); + } + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForService(serviceRemovedEvent.getServiceName()); + } + } + }); + } + + private Member findMember(String serviceName, String clusterId, String memberId) { + Service service = TopologyManager.getTopology().getService(serviceName); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service not found in topology: [service] %s", serviceName)); + } + return null; + } + + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", serviceName, clusterId)); + } + return null; + } + + Member member = cluster.getMember(memberId); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", serviceName, + clusterId, memberId)); + } + return null; + } + return member; + } + + private void addMemberIpsToMemberIpHostnameMap(Cluster cluster, Member member) { + if ((cluster.getHostNames() == null) || (cluster.getHostNames().size() == 0)) { + if (log.isWarnEnabled()) { + log.warn(String.format("Hostnames not found in cluster %s, could not add member ips to member-ip " + + "-> hostname map", member.getClusterId())); + } + return; + } + + String hostname = cluster.getHostNames().get(0); + if (cluster.getHostNames().size() > 1) { + if (log.isWarnEnabled()) { + log.warn(String.format("Multiple hostnames found in cluster %s, using %s", + cluster.getHostNames().toString(), hostname)); + } + } + + if (StringUtils.isNotBlank(member.getMemberIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberIp(), hostname); + if (log.isDebugEnabled()) { + log.debug(String.format("Member private ip added to member-ip -> hostname map: [service] %s [cluster] " + + "%s [member] %s [private-ip] %s", member.getServiceName(), member.getClusterId(), + member.getMemberId(), member.getMemberIp() + )); + } + } + if (StringUtils.isNotBlank(member.getMemberPublicIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberPublicIp(), hostname); + if (log.isDebugEnabled()) { + log.debug(String.format("Member public ip added to member-ip -> hostname map: [service] %s [cluster] " + + "%s [member] %s [public-ip] %s", member.getServiceName(), member.getClusterId(), + member.getMemberId(), member.getMemberPublicIp() + )); + } + } + } + + private void removeMemberIpsFromMemberIpHostnameMap(Member member) { + if (StringUtils.isNotBlank(member.getMemberIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberIp()); + if (log.isDebugEnabled()) { + log.debug(String.format("Member private ip removed from member-ip -> hostname map: [private-ip] %s", + member.getMemberIp())); + } + } + if (StringUtils.isNotBlank(member.getMemberPublicIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberPublicIp()); + if (log.isDebugEnabled()) { + log.debug(String.format("Member public ip removed from member-ip -> hostname map: [public-ip] %s", + member.getMemberPublicIp())); + } + } + } + + /** + * Terminate load balancer topology receiver thread. + */ + public void terminate() { + topologyEventReceiver.terminate(); + terminated = true; + } +}
