Updated Branches: refs/heads/master 4c2962f53 -> 221f3dbfe
adding a CompleteTopologyEventProcessor Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/221f3dbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/221f3dbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/221f3dbf Branch: refs/heads/master Commit: 221f3dbfea74b63344f3b6d3357bdec77139d3e1 Parents: 4c2962f Author: Isuru <[email protected]> Authored: Wed Dec 11 15:34:11 2013 +0530 Committer: Isuru <[email protected]> Committed: Wed Dec 11 15:34:11 2013 +0530 ---------------------------------------------------------------------- .../CompleteTopologyEventProcessor.java | 122 +++++++++++++++++++ .../processor/InstanceStatusProcessor.java | 2 + .../processor/TopologyEventProcessorChain.java | 15 ++- 3 files changed, 133 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java new file mode 100644 index 0000000..c37355f --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/CompleteTopologyEventProcessor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.event.processor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo; +import org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel; +import org.apache.stratos.adc.mgt.utils.PersistenceManager; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; +import java.util.Collection; + +public class CompleteTopologyEventProcessor extends TopologyEventProcessor { + + private static final Log log = LogFactory.getLog(CompleteTopologyEventProcessor.class); + + @Override + public void process(Message message) { + + doProcessing(message); + //go to next processor in the chain + if(nextTopologyEventProcessor != null) { + nextTopologyEventProcessor.process(message); + } + } + + private void doProcessing (Message message) { + + String messageType = null; + + try { + messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + } catch (JMSException e) { + log.error("Error in getting message type from received Message " + message.getClass().toString(), e); + return; + } + + log.info("Received Complete Topology message: " + messageType); + + if (CompleteTopologyEvent.class.getName().equals(messageType)) { + log.info("Received message: " + messageType); + + CompleteTopologyEvent event = getCompleteTopologyEvent(message); + //get all services + Collection<Service> serviceCollection = event.getTopology().getServices(); + //iterate through the services + for (Service service : serviceCollection) { + //get all clusters + Collection<Cluster> clusterCollection = service.getClusters(); + //iterate through the clusters + for (Cluster cluster : clusterCollection) { + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = + getCartridgeSubscriptionInfo(cluster.getClusterId()); + + if(cartridgeSubscriptionInfo != null) { + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + } + } + } + } + + private CompleteTopologyEvent getCompleteTopologyEvent (Message message) { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(json, CompleteTopologyEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Services: " + event.getTopology().getServices().toString() + " ]"); + } + + return event; + } + + private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) { + + try { + return PersistenceManager.getSubscriptionFromClusterId(clusterDomain); + + } catch (Exception e) { + log.error("Error getting subscription information for cluster " + clusterDomain, e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java index d034962..1c13266 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java @@ -72,6 +72,8 @@ public class InstanceStatusProcessor extends TopologyEventProcessor { return; } + log.info("Received Topology message: " + messageType); + if (MemberStartedEvent.class.getName().equals(messageType)) { log.info("Received message: " + messageType); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/221f3dbf/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java index 5c25c59..1760449 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java @@ -23,11 +23,13 @@ import javax.jms.Message; public class TopologyEventProcessorChain { - private TopologyEventProcessor firstTopologyEventProcessor = null; + private TopologyEventProcessor completeTopologyEventProcessor = null; + private TopologyEventProcessor instanceStatusEventProcessor = null; private static TopologyEventProcessorChain topologyEventProcessorChain; private TopologyEventProcessorChain () { - firstTopologyEventProcessor = new InstanceStatusProcessor(); + completeTopologyEventProcessor = new CompleteTopologyEventProcessor(); + instanceStatusEventProcessor = new InstanceStatusProcessor(); } public static TopologyEventProcessorChain getInstance () { @@ -46,13 +48,14 @@ public class TopologyEventProcessorChain { public void initProcessorChain () { //if any other topology event processors are added, link them as follows - //firstTopologyEventProcessor.setNext(secondTopologyeventProcessor); - //secondTopologyeventProcessor.setNext(null); - firstTopologyEventProcessor.setNext(null); + //instanceStatusEventProcessor.setNext(nextTopologyeventProcessor); + //nextTopologyeventProcessor.setNext(null); + completeTopologyEventProcessor.setNext(instanceStatusEventProcessor); + instanceStatusEventProcessor.setNext(null); } public void startProcessing (Message message) { - firstTopologyEventProcessor.process(message); + completeTopologyEventProcessor.process(message); }
