Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 00c810081 -> 75e7d8e54


Fixing STRATOS-1584, making thrift client able to publish to multiple endpoints.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/75e7d8e5
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/75e7d8e5
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/75e7d8e5

Branch: refs/heads/stratos-4.1.x
Commit: 75e7d8e54b37791099da16e0f72fc8ecc9d4fdda
Parents: 00c8100
Author: Sajith <[email protected]>
Authored: Tue Oct 13 17:39:14 2015 +0530
Committer: Sajith <[email protected]>
Committed: Tue Oct 13 17:44:15 2015 +0530

----------------------------------------------------------------------
 .../publisher/ThriftClientConfig.java           |  12 ++-
 .../publisher/ThriftClientConfigParser.java     | 107 ++++++++++---------
 .../statistics/publisher/ThriftClientInfo.java  |  11 ++
 .../publisher/ThriftStatisticsPublisher.java    |  81 ++++++++++----
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |   2 +-
 .../test/ThriftClientConfigParserTest.java      |  55 +++++++---
 .../src/test/resources/thrift-client-config.xml |  43 +++++---
 .../src/main/conf/thrift-client-config.xml      |  50 ++++++---
 8 files changed, 240 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
index 7f6d8c4..d68e6db 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
@@ -20,6 +20,8 @@
 package org.apache.stratos.common.statistics.publisher;
 
 
+import java.util.List;
+
 import org.apache.commons.lang.StringUtils;
 
 /**
@@ -32,7 +34,7 @@ public class ThriftClientConfig {
     public static final String DAS_THRIFT_CLIENT_NAME = "das";
 
     private static volatile ThriftClientConfig instance;
-    private ThriftClientInfo cepThriftClientInfo, dasThriftClientInfo;
+    private List <ThriftClientInfo> cepThriftClientInfo, dasThriftClientInfo;
 
     /*
     * A private Constructor prevents any other
@@ -58,7 +60,7 @@ public class ThriftClientConfig {
     }
 
     /**
-     * Returns an ThriftClientInfo Object that stores the credential 
information.
+     * Returns a list of ThriftClientInfo Object that stores the credential 
information.
      * Thrift client credential information can be found under 
thrift-client-config.xml file
      * These credential information then get parsed and assigned into 
ThriftClientInfo
      * Object.
@@ -68,7 +70,7 @@ public class ThriftClientConfig {
      * @param thriftClientName Thrift Client Name
      * @return ThriftClientInfo object which consists of username,password,ip 
and port values
      */
-    public ThriftClientInfo getThriftClientInfo(String thriftClientName) {
+    public List <ThriftClientInfo> getThriftClientInfo(String 
thriftClientName) {
         if (CEP_THRIFT_CLIENT_NAME.equals(thriftClientName)) {
             return cepThriftClientInfo;
         } else if (DAS_THRIFT_CLIENT_NAME.equals(thriftClientName)) {
@@ -84,7 +86,7 @@ public class ThriftClientConfig {
      * @param thriftClientInfo DAS Thrift Client Information
      */
 
-    public void setDASThriftClientInfo(ThriftClientInfo thriftClientInfo) {
+    public void setDASThriftClientInfo(List <ThriftClientInfo> 
thriftClientInfo) {
         this.dasThriftClientInfo = thriftClientInfo;
     }
 
@@ -95,7 +97,7 @@ public class ThriftClientConfig {
      * @param thriftClientInfo CEP Thrift Client Information
      */
 
-    public void setCEPThriftClientInfo(ThriftClientInfo thriftClientInfo) {
+    public void setCEPThriftClientInfo(List <ThriftClientInfo> 
thriftClientInfo) {
         this.cepThriftClientInfo = thriftClientInfo;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
index 361b56a..266a6fc 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
@@ -19,6 +19,13 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.om.OMElement;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -26,9 +33,6 @@ import org.apache.stratos.common.util.AxiomXpathParserUtil;
 import org.wso2.securevault.SecretResolver;
 import org.wso2.securevault.SecretResolverFactory;
 
-import java.io.File;
-import java.util.Iterator;
-
 /**
  * Thrift client config parser.
  */
@@ -66,8 +70,8 @@ public class ThriftClientConfigParser {
             }
 
             ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
-            ThriftClientInfo cepThriftClientInfo = new ThriftClientInfo();
-            ThriftClientInfo dasThriftClientInfo = new ThriftClientInfo();
+            List<ThriftClientInfo> cepThriftClientInfoList = new 
ArrayList<ThriftClientInfo>();
+            List<ThriftClientInfo> dasThriftClientInfoList = new 
ArrayList<ThriftClientInfo>();
 
             File configFile = new File(filePath);
             if (!configFile.exists()) {
@@ -94,58 +98,65 @@ public class ThriftClientConfigParser {
             while (thriftClientIterator.hasNext()) {
                 OMElement thriftClientConfig = (OMElement) 
thriftClientIterator.next();
                 Iterator thriftClientConfigIterator = 
thriftClientConfig.getChildElements();
-                ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
 
                 while (thriftClientConfigIterator.hasNext()) {
                     OMElement thriftClientConfigElement = (OMElement) 
thriftClientConfigIterator.next();
+                    Iterator thriftClientTypeItr = 
thriftClientConfigElement.getChildElements();
+                    
+                    while(thriftClientTypeItr.hasNext()) {                     
+                       OMElement thriftTypeElement = (OMElement) 
thriftClientTypeItr.next();
+                       Iterator nodeItr = thriftTypeElement.getChildElements();
+                       ThriftClientInfo thriftClientInfo = new 
ThriftClientInfo();
+                       
thriftClientInfo.setId(thriftTypeElement.getAttributeValue(new QName("id")));
+                       
+                       while(nodeItr.hasNext()) {                      
+                               OMElement nodeElement = (OMElement) 
nodeItr.next();                     
+                               
+                            if 
(STATS_PUBLISHER_ENABLED.equals(nodeElement.getQName().getLocalPart())) {
+                                statsPublisherEnabled = 
Boolean.parseBoolean(nodeElement.getText());
+                                
thriftClientInfo.setStatsPublisherEnabled(statsPublisherEnabled);
+                            }
 
-                    if 
(NAME_ELEMENT.equals(thriftClientConfigElement.getQName().getLocalPart())) {
-                        nameValuesStr = thriftClientConfigElement.getText();
-                        if (CEP_NAME_ELEMENT.equals(nameValuesStr)) {
-                            cepThriftClientInfo = thriftClientInfo;
-                        } else if (DAS_NAME_ELEMENT.equals(nameValuesStr)) {
-                            dasThriftClientInfo = thriftClientInfo;
-                        }
-                    }
-
-                    if 
(STATS_PUBLISHER_ENABLED.equals(thriftClientConfigElement.getQName().getLocalPart()))
 {
-                        statsPublisherEnabled = 
Boolean.parseBoolean(thriftClientConfigElement.getText());
-                        
thriftClientInfo.setStatsPublisherEnabled(statsPublisherEnabled);
-                    }
-
-                    if 
(USERNAME_ELEMENT.equals(thriftClientConfigElement.getQName().getLocalPart())) {
-                        userNameValuesStr = 
thriftClientConfigElement.getText();
-                        thriftClientInfo.setUsername(userNameValuesStr);
-                    }
+                            if 
(USERNAME_ELEMENT.equals(nodeElement.getQName().getLocalPart())) {
+                                userNameValuesStr = nodeElement.getText();
+                                
thriftClientInfo.setUsername(userNameValuesStr);
+                            }
 
-                    //password field protected using Secure vault
-                    if 
(PASSWORD_ELEMENT.equals(thriftClientConfigElement.getQName().getLocalPart())) {
-                        if ((secretResolver != null) && 
(secretResolver.isInitialized())) {
-                            if (secretResolver.isTokenProtected(secretAlias)) {
-                                passwordValueStr = 
secretResolver.resolve(secretAlias);
-                            } else {
-                                passwordValueStr = 
thriftClientConfigElement.getText();
+                            //password field protected using Secure vault
+                            if 
(PASSWORD_ELEMENT.equals(nodeElement.getQName().getLocalPart())) {
+                                if ((secretResolver != null) && 
(secretResolver.isInitialized())) {
+                                    if 
(secretResolver.isTokenProtected(secretAlias)) {
+                                        passwordValueStr = 
secretResolver.resolve(secretAlias);
+                                    } else {
+                                        passwordValueStr = 
nodeElement.getText();
+                                    }
+                                } else {
+                                    passwordValueStr = nodeElement.getText();
+                                }
+                                thriftClientInfo.setPassword(passwordValueStr);
                             }
-                        } else {
-                            passwordValueStr = 
thriftClientConfigElement.getText();
-                        }
-                        thriftClientInfo.setPassword(passwordValueStr);
-                    }
 
-                    if 
(IP_ELEMENT.equals(thriftClientConfigElement.getQName().getLocalPart())) {
-                        ipValuesStr = thriftClientConfigElement.getText();
-                        thriftClientInfo.setIp(ipValuesStr);
-                    }
+                            if 
(IP_ELEMENT.equals(nodeElement.getQName().getLocalPart())) {
+                                ipValuesStr = nodeElement.getText();
+                                thriftClientInfo.setIp(ipValuesStr);
+                            }
 
-                    if 
(PORT_ELEMENT.equals(thriftClientConfigElement.getQName().getLocalPart())) {
-                        portValueStr = thriftClientConfigElement.getText();
-                        thriftClientInfo.setPort(portValueStr);
+                            if 
(PORT_ELEMENT.equals(nodeElement.getQName().getLocalPart())) {
+                                portValueStr = nodeElement.getText();
+                                thriftClientInfo.setPort(portValueStr);
+                            }
+                               
+                       }
+                       
if(thriftClientConfigElement.getLocalName().equals(CEP_NAME_ELEMENT)) {
+                               cepThriftClientInfoList.add(thriftClientInfo);  
                        
+                       } else if 
(thriftClientConfigElement.getLocalName().equals(DAS_NAME_ELEMENT)) {
+                               dasThriftClientInfoList.add(thriftClientInfo);
+                       }
+                       
                     }
+                    
                 }
             }
-            if (nameValuesStr == null) {
-                throw new RuntimeException("Name value not found in thrift 
client configuration ");
-            }
 
             if (userNameValuesStr == null) {
                 throw new RuntimeException("Username value not found in thrift 
client configuration");
@@ -162,8 +173,8 @@ public class ThriftClientConfigParser {
                 throw new RuntimeException("Port not found in thrift client 
configuration ");
             }
 
-            thriftClientIConfig.setCEPThriftClientInfo(cepThriftClientInfo);
-            thriftClientIConfig.setDASThriftClientInfo(dasThriftClientInfo);
+            
thriftClientIConfig.setCEPThriftClientInfo(cepThriftClientInfoList);
+            
thriftClientIConfig.setDASThriftClientInfo(dasThriftClientInfoList);
 
             return thriftClientIConfig;
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
index 162c04f..dcdf3d9 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
@@ -28,6 +28,7 @@ public class ThriftClientInfo {
     private String password;
     private String ip;
     private String port;
+    private String id;
 
     public boolean isStatsPublisherEnabled() {
         return statsPublisherEnabled;
@@ -68,4 +69,14 @@ public class ThriftClientInfo {
     public void setPort(String port) {
         this.port = port;
     }
+
+       public String getId() {
+               return id;
+       }
+
+       public void setId(String id) {
+               this.id = id;
+       }
+      
+    
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 151137e..7d4aa6e 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -25,10 +25,16 @@ import org.wso2.carbon.databridge.agent.thrift.Agent;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
 import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
+import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
+import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
 import org.wso2.carbon.databridge.commons.Event;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 /**
  * Thrift statistics publisher.
@@ -38,11 +44,8 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
     private static final Log log = 
LogFactory.getLog(ThriftStatisticsPublisher.class);
 
     private StreamDefinition streamDefinition;
-    private AsyncDataPublisher asyncDataPublisher;
-    private String ip;
-    private String port;
-    private String username;
-    private String password;
+    private LoadBalancingDataPublisher loadBalancingDataPublisher;
+    private List<ThriftClientInfo> thriftClientInfoList;
     private boolean enabled = false;
 
     /**
@@ -54,30 +57,58 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
      */
     public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
         ThriftClientConfig thriftClientConfig = 
ThriftClientConfig.getInstance();
-        ThriftClientInfo thriftClientInfo = 
thriftClientConfig.getThriftClientInfo(thriftClientName);
-
+        this.thriftClientInfoList = 
thriftClientConfig.getThriftClientInfo(thriftClientName);
         this.streamDefinition = streamDefinition;
-        this.enabled = thriftClientInfo.isStatsPublisherEnabled();
-        this.ip = thriftClientInfo.getIp();
-        this.port = thriftClientInfo.getPort();
-        this.username = thriftClientInfo.getUsername();
-        this.password = thriftClientInfo.getPassword();
 
-        if (enabled) {
+        if (isPublisherEnabled()) {
+               this.enabled = true;
             init();
         }
     }
 
-    private void init() {
-        AgentConfiguration agentConfiguration = new AgentConfiguration();
-        Agent agent = new Agent(agentConfiguration);
-
-        // Initialize asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port 
+ "", username, password, agent);
-        asyncDataPublisher.addStreamDefinition(streamDefinition);
+    private boolean isPublisherEnabled() {
+       boolean publisherEnabled = false;
+       for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
+               publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
+               if(publisherEnabled){
+                       break;
+               }
+               }       
+               return publisherEnabled;
+       }
+
+       private void init() {
+        
+        // Initialize load balancing data publisher       
+        loadBalancingDataPublisher = new 
LoadBalancingDataPublisher(getReceiverGroups());
+        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
     }
 
-    @Override
+    private ArrayList<ReceiverGroup> getReceiverGroups() {
+       
+        ArrayList<ReceiverGroup> receiverGroups = new 
ArrayList<ReceiverGroup>();    
+        
+        for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
+               ArrayList<DataPublisherHolder> dataPublisherHolders = new 
ArrayList<DataPublisherHolder>();
+                       DataPublisherHolder aNode = new 
DataPublisherHolder(null, buildUrl(thriftClientInfo), 
thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
+                       dataPublisherHolders.add(aNode);
+                       ReceiverGroup group = new 
ReceiverGroup(dataPublisherHolders);
+                       receiverGroups.add(group);
+               } 
+               return receiverGroups; 
+        
+       }
+
+       private String buildUrl(ThriftClientInfo thriftClientInfo) {
+               String url = new StringBuilder()
+                                               .append("tcp://")
+                                               
.append(thriftClientInfo.getIp())
+                                               .append(":")
+                                               
.append(thriftClientInfo.getPort()).toString();                         
+               return url;
+       }
+
+       @Override
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
         if (this.enabled) {
@@ -105,7 +136,13 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
                 log.debug(String.format("Publishing thrift event: [stream] %s 
[version] %s",
                         streamDefinition.getName(), 
streamDefinition.getVersion()));
             }
-            asyncDataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+            loadBalancingDataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Successfully Published ********  
thrift event: [stream] %s [version] %s",
+                        streamDefinition.getName(), 
streamDefinition.getVersion()));
+            }
+            
+            
         } catch (AgentException e) {
             if (log.isErrorEnabled()) {
                 log.error(String.format("Could not publish thrift event: 
[stream] %s [version] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index 1ccaaa6..ea1c401 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -85,7 +85,7 @@ public class WSO2CEPHealthStatisticsPublisher extends 
ThriftStatisticsPublisher
                         String memberId, String partitionId, String health, 
double value) {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Publishing health statistics: [cluster] 
%s [cluster-instance] %s " +
-                            "[network-partition] %s [partition] %s [member] %s 
[health] %s [value] %d",
+                            "[network-partition] %s [partition] %s [member] %s 
[health] %s [value] %f",
                     clusterId, clusterInstanceId, networkPartitionId, 
partitionId, memberId, health, value));
         }
         // Set payload values

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
 
b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
index 1627c40..ae7e059 100644
--- 
a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
+++ 
b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
@@ -20,11 +20,13 @@
 package org.apache.stratos.common.test;
 
 import junit.framework.TestCase;
+
 import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
 import org.apache.stratos.common.statistics.publisher.ThriftClientInfo;
 import org.junit.Test;
 
 import java.net.URL;
+import java.util.List;
 
 
 /**
@@ -42,21 +44,48 @@ public class ThriftClientConfigParserTest extends TestCase {
         URL configFileUrl = 
ThriftClientConfigParserTest.class.getResource("/thrift-client-config.xml");
         System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, 
configFileUrl.getPath());
         ThriftClientConfig thriftClientConfig = 
ThriftClientConfig.getInstance();
-        ThriftClientInfo cepThriftClientInfo = 
thriftClientConfig.getThriftClientInfo(
+        List <ThriftClientInfo> cepList = 
thriftClientConfig.getThriftClientInfo(
                 ThriftClientConfig.CEP_THRIFT_CLIENT_NAME);
-        ThriftClientInfo dasThriftClientInfo = 
thriftClientConfig.getThriftClientInfo(
+        List <ThriftClientInfo> dasList = 
thriftClientConfig.getThriftClientInfo(
                 ThriftClientConfig.DAS_THRIFT_CLIENT_NAME);
+        ThriftClientInfo cepNode1 = null;
+        ThriftClientInfo cepNode2 = null;
+        ThriftClientInfo dasNode1 = null;
+        
+        for (ThriftClientInfo cepNodeInfo : cepList) {
+                       if(cepNodeInfo.getId().equals("node-01")) {
+                               cepNode1 = cepNodeInfo;
+                       }else if(cepNodeInfo.getId().equals("node-02")) {
+                               cepNode2 = cepNodeInfo;
+                       }
+               }
+                
+        for (ThriftClientInfo dasNodeInfo : dasList) {
+                       if(dasNodeInfo.getId().equals("node-01")) {
+                               dasNode1 = dasNodeInfo;
+                       }
+               }
+
+        // CEP-node1
+        assertEquals("CEP Stats Publisher not 
enabled",true,cepNode1.isStatsPublisherEnabled());        
+        assertEquals("Incorrect Username", "admincep1", 
cepNode1.getUsername());
+        assertEquals("Incorrect Password", "1234cep1", cepNode1.getPassword());
+        assertEquals("Incorrect IP", "192.168.10.10", cepNode1.getIp());
+        assertEquals("Incorrect Port", "9300", cepNode1.getPort());
+        
+        // CEP-node2
+        assertEquals("CEP Stats Publisher not 
enabled",true,cepNode2.isStatsPublisherEnabled());        
+        assertEquals("Incorrect Username", "admincep2", 
cepNode2.getUsername());
+        assertEquals("Incorrect Password", "1234cep2", cepNode2.getPassword());
+        assertEquals("Incorrect IP", "192.168.10.20", cepNode2.getIp());
+        assertEquals("Incorrect Port", "9300", cepNode2.getPort());
 
-        assertEquals("CEP Stats Publisher not 
enabled",true,cepThriftClientInfo.isStatsPublisherEnabled());
-        assertEquals("Incorrect Username", "admin", 
cepThriftClientInfo.getUsername());
-        assertEquals("Incorrect Password", "1234", 
cepThriftClientInfo.getPassword());
-        assertEquals("Incorrect IP", "192.168.10.10", 
cepThriftClientInfo.getIp());
-        assertEquals("Incorrect Port", "9300", cepThriftClientInfo.getPort());
-
-        assertEquals("DAS Stats Publisher not 
enabled",true,dasThriftClientInfo.isStatsPublisherEnabled());
-        assertEquals("Incorrect Username", "admin1", 
dasThriftClientInfo.getUsername());
-        assertEquals("Incorrect Password", "12345", 
dasThriftClientInfo.getPassword());
-        assertEquals("Incorrect IP", "192.168.10.11", 
dasThriftClientInfo.getIp());
-        assertEquals("Incorrect Port", "9301", dasThriftClientInfo.getPort());
+        // DAS node 1
+        assertEquals("DAS Stats Publisher not enabled",true, 
dasNode1.isStatsPublisherEnabled());
+        assertEquals("Incorrect Username", "admindas1", 
dasNode1.getUsername());
+        assertEquals("Incorrect Password", "1234das1", dasNode1.getPassword());
+        assertEquals("Incorrect IP", "192.168.10.11", dasNode1.getIp());
+        assertEquals("Incorrect Port", "9301", dasNode1.getPort());
+       
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/components/org.apache.stratos.common/src/test/resources/thrift-client-config.xml
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/test/resources/thrift-client-config.xml
 
b/components/org.apache.stratos.common/src/test/resources/thrift-client-config.xml
index 8b10756..f828e0d 100644
--- 
a/components/org.apache.stratos.common/src/test/resources/thrift-client-config.xml
+++ 
b/components/org.apache.stratos.common/src/test/resources/thrift-client-config.xml
@@ -20,20 +20,31 @@
 
 <!-- Apache thrift client configuration for publishing statistics to WSO2 CEP 
and WSO2 DAS -->
 <thriftClientConfiguration>
-    <config>
-        <name>cep</name>
-        <statsPublisherEnabled>true</statsPublisherEnabled>
-        <username>admin</username>
-        <password>1234</password>
-        <ip>192.168.10.10</ip>
-        <port>9300</port>
-    </config>
-    <config>
-        <name>das</name>
-        <statsPublisherEnabled>true</statsPublisherEnabled>
-        <username>admin1</username>
-        <password>12345</password>
-        <ip>192.168.10.11</ip>
-        <port>9301</port>
-    </config>
+     <config>
+        <cep>
+             <node id="node-01">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admincep1</username>
+                  <password>1234cep1</password>
+                  <ip>192.168.10.10</ip>
+                  <port>9300</port>
+             </node>
+             <node id="node-02">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admincep2</username>
+                  <password>1234cep2</password>
+                  <ip>192.168.10.20</ip>
+                  <port>9300</port>
+             </node>
+        </cep>
+        <das>
+             <node id="node-01">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admindas1</username>
+                  <password>1234das1</password>
+                  <ip>192.168.10.11</ip>
+                  <port>9301</port>
+             </node>
+        </das>
+    </config>  
 </thriftClientConfiguration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/75e7d8e5/products/stratos/modules/distribution/src/main/conf/thrift-client-config.xml
----------------------------------------------------------------------
diff --git 
a/products/stratos/modules/distribution/src/main/conf/thrift-client-config.xml 
b/products/stratos/modules/distribution/src/main/conf/thrift-client-config.xml
index f730c42..fa684bd 100644
--- 
a/products/stratos/modules/distribution/src/main/conf/thrift-client-config.xml
+++ 
b/products/stratos/modules/distribution/src/main/conf/thrift-client-config.xml
@@ -20,20 +20,38 @@
 
 <!-- Apache thrift client configuration for publishing statistics to WSO2 CEP 
and WSO2 DAS-->
 <thriftClientConfiguration>
-    <config>
-        <name>cep</name>
-        <statsPublisherEnabled>true</statsPublisherEnabled>
-        <username>admin</username>
-        <password>admin</password>
-        <ip>localhost</ip>
-        <port>7611</port>
+     <config>
+        <cep>
+             <node id="node-01">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admin</username>
+                  <password>admin</password>
+                  <ip>10.10.1.1</ip>
+                  <port>7711</port>
+             </node>
+             <!--<node id="node-02">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admin</username>
+                  <password>admin</password>
+                  <ip>10.10.1.1</ip>
+                  <port>7714</port>
+             </node>-->
+        </cep>
+        <das>
+             <node id="node-01">
+                  <statsPublisherEnabled>false</statsPublisherEnabled>
+                  <username>admin</username>
+                  <password>admin</password>
+                  <ip>10.10.1.1</ip>
+                  <port>7712</port>
+             </node>
+             <!--<node id="node-02">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admin</username>
+                  <password>admin</password>
+                  <ip>10.10.1.1</ip>
+                  <port>7713</port>
+             </node>-->
+        </das>
     </config>
-    <config>
-        <name>das</name>
-        <statsPublisherEnabled>false</statsPublisherEnabled>
-        <username>admin</username>
-        <password>admin</password>
-        <ip>localhost</ip>
-        <port>7612</port>
-    </config>
-</thriftClientConfiguration>
\ No newline at end of file
+</thriftClientConfiguration>

Reply via email to