Updated Branches: refs/heads/master 8a036e5f7 -> 3e2d598a5
using the event listner model without a custom event processor chain Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/04bed7e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/04bed7e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/04bed7e6 Branch: refs/heads/master Commit: 04bed7e6fd2bbdf6a1defe00127cc6f1e633786d Parents: 64bc33d Author: Isuru <[email protected]> Authored: Thu Dec 12 22:46:19 2013 +0530 Committer: Isuru <[email protected]> Committed: Thu Dec 12 22:46:19 2013 +0530 ---------------------------------------------------------------------- .../internal/ADCManagementServerComponent.java | 23 +- .../StratosManagerTopologyReceiver.java | 293 +++++++++++++++++++ 2 files changed, 310 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/04bed7e6/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java index 0b9c7b8..8999c9f 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java @@ -21,15 +21,14 @@ package org.apache.stratos.adc.mgt.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.adc.mgt.listener.InstanceStatusListener; -import org.apache.stratos.adc.mgt.listener.TopologyEventListner; import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher; import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler; +import org.apache.stratos.adc.mgt.topology.receiver.StratosManagerTopologyReceiver; import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader; import org.apache.stratos.adc.mgt.utils.StratosDBUtils; import org.apache.stratos.adc.topology.mgt.service.TopologyManagementService; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; -import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; import org.apache.stratos.messaging.util.Constants; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.ntask.core.service.TaskService; @@ -66,7 +65,9 @@ import org.wso2.carbon.utils.ConfigurationContextService; */ public class ADCManagementServerComponent { + private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class); + private StratosManagerTopologyReceiver stratosManagerTopologyReceiver; protected void activate(ComponentContext componentContext) throws Exception { try { @@ -99,7 +100,7 @@ public class ADCManagementServerComponent { tsubscriber.start(); //initializing the topology event subscriber - TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); + /*TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); topologyTopicSubscriber.setMessageListener(new TopologyEventListner()); Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber); topologyTopicSubscriberThread.start(); @@ -107,11 +108,15 @@ public class ADCManagementServerComponent { //Starting Topology Receiver TopologyReceiver topologyReceiver = new TopologyReceiver(); Thread topologyReceiverThread = new Thread(topologyReceiver); + topologyReceiverThread.start();*/ + + stratosManagerTopologyReceiver = new StratosManagerTopologyReceiver(); + Thread topologyReceiverThread = new Thread(stratosManagerTopologyReceiver); topologyReceiverThread.start(); + log.info("Topology receiver thread started"); - if (log.isInfoEnabled()) { - log.info("ADC management server component is activated"); - } + //Component activated successfully + log.info("ADC management server component is activated"); } catch (Exception e) { if(log.isFatalEnabled()) { @@ -170,4 +175,10 @@ public class ADCManagementServerComponent { } ServiceReferenceHolder.getInstance().setTaskService(null); } + + protected void deactivate(ComponentContext context) { + + //terminate Stratos Manager Topology Receiver + stratosManagerTopologyReceiver.terminate(); + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/04bed7e6/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java new file mode 100644 index 0000000..75fdbc9 --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.receiver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo; +import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel; +import org.apache.stratos.adc.mgt.utils.PersistenceManager; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; + +public class StratosManagerTopologyReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(StratosManagerTopologyReceiver.class); + + private TopologyReceiver stratosManagerTopologyReceiver; + private boolean terminate; + + public StratosManagerTopologyReceiver() { + this.terminate = false; + this.stratosManagerTopologyReceiver = new TopologyReceiver(createMessageDelegator()); + } + + private TopologyEventMessageDelegator createMessageDelegator() { + TopologyMessageProcessorChain processorChain = createEventProcessorChain(); + return new TopologyEventMessageDelegator(processorChain); + } + + private TopologyMessageProcessorChain createEventProcessorChain() { + + TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain(); + + //add listner to Complete Topology Event + processorChain.addEventListener(new CompleteTopologyEventListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyManager.acquireReadLock(); + for (Service service : TopologyManager.getTopology().getServices()) { + //iterate through all clusters + for (Cluster cluster : service.getClusters()) { + //get subscription details + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = + getCartridgeSubscriptionInfo(cluster.getClusterId()); + + if(cartridgeSubscriptionInfo != null) { + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + } + } + } finally { + TopologyManager.releaseReadLock(); + } + } + }); + + //Cluster Created event listner + processorChain.addEventListener(new ClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + + ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event; + //get subscription details + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = + getCartridgeSubscriptionInfo(clustercreatedEvent.getClusterId()); + + if(cartridgeSubscriptionInfo != null) { + + Cluster cluster; + //acquire read lock + TopologyManager.acquireReadLock(); + try { + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain()); + } finally { + //release read lock + TopologyManager.releaseReadLock(); + } + + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + }); + + //Cluster Removed event listner + processorChain.addEventListener(new ClusterRemovedEventListener() { + @Override + protected void onEvent(Event event) { + + ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; + + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = + getCartridgeSubscriptionInfo(clusterRemovedEvent.getClusterId()); + + if (cartridgeSubscriptionInfo != null) { + //remove the information from Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias()); + } + } + }); + + //Member Started event listner + processorChain.addEventListener(new MemberStartedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event; + + String clusterDomain = memberStartedEvent.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + + Cluster cluster; + //acquire read lock + TopologyManager.acquireReadLock(); + try { + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain()); + } finally { + //release read lock + TopologyManager.releaseReadLock(); + } + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + }); + + //Member Activated event listner + processorChain.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + String clusterDomain = memberActivatedEvent.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + + Cluster cluster; + //acquire read lock + TopologyManager.acquireReadLock(); + try { + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain()); + } finally { + //release read lock + TopologyManager.releaseReadLock(); + } + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + }); + + //Member Suspended event listner + processorChain.addEventListener(new MemberSuspendedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; + + String clusterDomain = memberSuspendedEvent.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + + Cluster cluster; + //acquire read lock + TopologyManager.acquireReadLock(); + try { + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain()); + } finally { + //release read lock + TopologyManager.releaseReadLock(); + } + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + }); + + //Member Terminated event listner + processorChain.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + + String clusterDomain = memberTerminatedEvent.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + + Cluster cluster; + //acquire read lock + TopologyManager.acquireReadLock(); + try { + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain()); + } finally { + //release read lock + TopologyManager.releaseReadLock(); + } + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + }); + + return processorChain; + } + + private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) { + + try { + return PersistenceManager.getSubscriptionFromClusterId(clusterDomain); + + } catch (Exception e) { + log.error("Error getting subscription information for cluster " + clusterDomain, e); + return null; + } + } + + @Override + public void run() { + + Thread thread = new Thread(stratosManagerTopologyReceiver); + thread.start(); + log.info("Stratos Manager topology receiver thread started"); + + //Keep running till terminate is set from deactivate method of the component + while (!terminate) { + //loop while terminate = false + } + log.info("Stratos Manager topology receiver thread terminated"); + } + + //terminate Topology Receiver + public void terminate () { + stratosManagerTopologyReceiver.terminate(); + terminate = true; + } +}
