Refactroing thrift publisher classes

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/af13aeba
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/af13aeba
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/af13aeba

Branch: refs/heads/stratos-4.1.x
Commit: af13aebae4b78c9ba4df0286eb081d48794fc427
Parents: 962ce94
Author: Thanuja <[email protected]>
Authored: Mon Nov 23 11:23:15 2015 +0530
Committer: Thanuja <[email protected]>
Committed: Mon Nov 23 11:28:38 2015 +0530

----------------------------------------------------------------------
 .../publisher/AutoscalerPublisherFactory.java   |  5 +-
 .../publisher/DASScalingDecisionPublisher.java  | 20 ++++-
 .../autoscaler/util/AutoscalerConstants.java    |  1 +
 .../CloudControllerPublisherFactory.java        |  9 +-
 .../DASMemberInformationPublisher.java          | 18 +++-
 .../publisher/DASMemberStatusPublisher.java     | 17 +++-
 .../util/CloudControllerConstants.java          |  1 +
 ...InvalidStatisticsPublisherTypeException.java | 30 +++++++
 .../HealthStatisticsPublisherFactory.java       |  5 +-
 .../InFlightRequestPublisherFactory.java        |  5 +-
 .../publisher/ThriftStatisticsPublisher.java    | 92 ++++++++++----------
 .../cep/WSO2CEPHealthStatisticsPublisher.java   | 15 +++-
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 15 +++-
 13 files changed, 165 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
index d057108..8c688ba 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.autoscaler.statistics.publisher;
 
+import 
org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 /**
@@ -29,9 +30,9 @@ public class AutoscalerPublisherFactory {
     public static ScalingDecisionPublisher 
createScalingDecisionPublisher(StatisticsPublisherType type) {
 
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASScalingDecisionPublisher();
+            return DASScalingDecisionPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid 
statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index a907043..097c568 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -38,22 +38,36 @@ import java.util.concurrent.ExecutorService;
 public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher 
implements ScalingDecisionPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASScalingDecisionPublisher.class);
+    private static volatile DASScalingDecisionPublisher 
dasScalingDecisionPublisher;
     private static final String DATA_STREAM_NAME = "scaling_decision";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private ExecutorService executorService;
 
     public DASScalingDecisionPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 
10);
+        executorService = 
StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASScalingDecisionPublisher getInstance() {
+        if (dasScalingDecisionPublisher == null) {
+            synchronized (DASScalingDecisionPublisher.class) {
+                if (dasScalingDecisionPublisher == null) {
+                    dasScalingDecisionPublisher = new 
DASScalingDecisionPublisher();
+                }
+            }
+        }
+        return dasScalingDecisionPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition
             StreamDefinition streamDefinition = new 
StreamDefinition(DATA_STREAM_NAME, VERSION);
-            streamDefinition.setNickName("Member Information");
-            streamDefinition.setDescription("Member Information");
+            streamDefinition.setNickName("Scaling Decision");
+            streamDefinition.setDescription("Scaling Decision");
             List<Attribute> payloadData = new ArrayList<Attribute>();
 
             // Set payload definition

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index 997ab0c..ef12983 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -72,6 +72,7 @@ public final class AutoscalerConstants {
     public static final String PAYLOAD_DEPLOYMENT = "default";
 
     public static final String MONITOR_THREAD_POOL_ID = "monitor.thread.pool";
+    public static final String STATS_PUBLISHER_THREAD_POOL_ID = 
"autoscaler.stats.publisher.thread.pool";
     public static final String MONITOR_THREAD_POOL_SIZE = 
"monitor.thread.pool.size";
     public static final String CLUSTER_MONITOR_SCHEDULER_ID = 
"cluster.monitor.scheduler";
     public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
index db68396..87d7ab9 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
+import 
org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 
 /**
@@ -33,9 +34,9 @@ public class CloudControllerPublisherFactory {
      */
     public static MemberInformationPublisher 
createMemberInformationPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASMemberInformationPublisher();
+            return DASMemberInformationPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid 
statistics publisher type is used to create publisher.");
         }
     }
 
@@ -47,9 +48,9 @@ public class CloudControllerPublisherFactory {
      */
     public static MemberStatusPublisher 
createMemberStatusPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2DAS) {
-            return new DASMemberStatusPublisher();
+            return DASMemberStatusPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid 
statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index d0dcc49..4ab65e1 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -44,16 +44,28 @@ import java.util.concurrent.ExecutorService;
 public class DASMemberInformationPublisher extends ThriftStatisticsPublisher 
implements MemberInformationPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASMemberInformationPublisher.class);
-
+    private static volatile DASMemberInformationPublisher 
dasMemberInformationPublisher;
     private static final String DATA_STREAM_NAME = "member_info";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private static final String VALUE_NOT_FOUND = "Value Not Found";
     private ExecutorService executorService;
 
-    public DASMemberInformationPublisher() {
+    private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool",
 10);
+        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
 STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASMemberInformationPublisher getInstance() {
+        if (dasMemberInformationPublisher == null) {
+            synchronized (DASMemberInformationPublisher.class) {
+                if (dasMemberInformationPublisher == null) {
+                    dasMemberInformationPublisher = new 
DASMemberInformationPublisher();
+                }
+            }
+        }
+        return dasMemberInformationPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 877256d..332bbba 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -38,14 +38,27 @@ import java.util.concurrent.ExecutorService;
 public class DASMemberStatusPublisher extends ThriftStatisticsPublisher 
implements MemberStatusPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASMemberStatusPublisher.class);
+    private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
     private static final String DATA_STREAM_NAME = "member_lifecycle";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
     private ExecutorService executorService;
 
-    public DASMemberStatusPublisher() {
+    private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool",
 10);
+        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
 STATS_PUBLISHER_THREAD_POOL_SIZE);
+    }
+
+    public static DASMemberStatusPublisher getInstance() {
+        if (dasMemberStatusPublisher == null) {
+            synchronized (DASMemberStatusPublisher.class) {
+                if (dasMemberStatusPublisher == null) {
+                    dasMemberStatusPublisher = new DASMemberStatusPublisher();
+                }
+            }
+        }
+        return dasMemberStatusPublisher;
     }
 
     private static StreamDefinition createStreamDefinition() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index c025bb4..ccd8d34 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -162,6 +162,7 @@ public final class CloudControllerConstants {
     public static final String TIMESTAMP_COL = "timestamp";
     public static final String SCALING_DECISION_ID_COL = "scaling_decision_id";
 
+    public static final String STATS_PUBLISHER_THREAD_POOL_ID = 
"cloud.controller.stats.publisher.thread.pool";
 
     /**
      * Properties

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
new file mode 100644
index 0000000..09efa1e
--- /dev/null
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.exception;
+
+/**
+ * This exception will be thrown when trying to create a publisher with 
invalid statistics publisher type.
+ */
+public class InvalidStatisticsPublisherTypeException extends Exception {
+
+    public InvalidStatisticsPublisherTypeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
index e4047ab..bf67c1b 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import 
org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import 
org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPHealthStatisticsPublisher;
 
 /**
@@ -28,9 +29,9 @@ public class HealthStatisticsPublisherFactory {
 
     public static HealthStatisticsPublisher 
createHealthStatisticsPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2CEP) {
-            return new WSO2CEPHealthStatisticsPublisher();
+            return WSO2CEPHealthStatisticsPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid 
statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
index c942bce..a4b9db8 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import 
org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException;
 import 
org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPInFlightRequestPublisher;
 
 /**
@@ -28,9 +29,9 @@ public class InFlightRequestPublisherFactory {
 
     public static InFlightRequestPublisher 
createInFlightRequestPublisher(StatisticsPublisherType type) {
         if (type == StatisticsPublisherType.WSO2CEP) {
-            return new WSO2CEPInFlightRequestPublisher();
+            return WSO2CEPInFlightRequestPublisher.getInstance();
         } else {
-            throw new RuntimeException("Unknown statistics publisher type");
+            throw new InvalidStatisticsPublisherTypeException("Invalid 
statistics publisher type is used to create publisher.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 7d4aa6e..16dba16 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -21,14 +21,10 @@ package org.apache.stratos.common.statistics.publisher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
 import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
 import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
 import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
 import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
-import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
 import org.wso2.carbon.databridge.commons.Event;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
 
@@ -52,8 +48,8 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
      * Credential information stored inside thrift-client-config.xml file
      * is parsed and assigned into ip,port,username and password fields
      *
-     * @param streamDefinition      Thrift Event Stream Definition
-     * @param thriftClientName      Thrift Client Name
+     * @param streamDefinition Thrift Event Stream Definition
+     * @param thriftClientName Thrift Client Name
      */
     public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
         ThriftClientConfig thriftClientConfig = 
ThriftClientConfig.getInstance();
@@ -61,54 +57,58 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
         this.streamDefinition = streamDefinition;
 
         if (isPublisherEnabled()) {
-               this.enabled = true;
+            this.enabled = true;
             init();
         }
     }
 
     private boolean isPublisherEnabled() {
-       boolean publisherEnabled = false;
-       for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
-               publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
-               if(publisherEnabled){
-                       break;
-               }
-               }       
-               return publisherEnabled;
-       }
-
-       private void init() {
-        
+        boolean publisherEnabled = false;
+        for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
+            publisherEnabled = thriftClientInfo.isStatsPublisherEnabled();
+            if (publisherEnabled) {
+                break;
+            }
+        }
+        return publisherEnabled;
+    }
+
+    private void init() {
+
         // Initialize load balancing data publisher       
         loadBalancingDataPublisher = new 
LoadBalancingDataPublisher(getReceiverGroups());
-        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+
+        //adding stream definition
+        if 
(!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
+            loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
+        }
     }
 
     private ArrayList<ReceiverGroup> getReceiverGroups() {
-       
-        ArrayList<ReceiverGroup> receiverGroups = new 
ArrayList<ReceiverGroup>();    
-        
+
+        ArrayList<ReceiverGroup> receiverGroups = new 
ArrayList<ReceiverGroup>();
+
         for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) {
-               ArrayList<DataPublisherHolder> dataPublisherHolders = new 
ArrayList<DataPublisherHolder>();
-                       DataPublisherHolder aNode = new 
DataPublisherHolder(null, buildUrl(thriftClientInfo), 
thriftClientInfo.getUsername(), thriftClientInfo.getPassword());
-                       dataPublisherHolders.add(aNode);
-                       ReceiverGroup group = new 
ReceiverGroup(dataPublisherHolders);
-                       receiverGroups.add(group);
-               } 
-               return receiverGroups; 
-        
-       }
-
-       private String buildUrl(ThriftClientInfo thriftClientInfo) {
-               String url = new StringBuilder()
-                                               .append("tcp://")
-                                               
.append(thriftClientInfo.getIp())
-                                               .append(":")
-                                               
.append(thriftClientInfo.getPort()).toString();                         
-               return url;
-       }
-
-       @Override
+            ArrayList<DataPublisherHolder> dataPublisherHolders = new 
ArrayList<DataPublisherHolder>();
+            DataPublisherHolder aNode = new DataPublisherHolder(null, 
buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), 
thriftClientInfo.getPassword());
+            dataPublisherHolders.add(aNode);
+            ReceiverGroup group = new ReceiverGroup(dataPublisherHolders);
+            receiverGroups.add(group);
+        }
+        return receiverGroups;
+
+    }
+
+    private String buildUrl(ThriftClientInfo thriftClientInfo) {
+        String url = new StringBuilder()
+                .append("tcp://")
+                .append(thriftClientInfo.getIp())
+                .append(":")
+                .append(thriftClientInfo.getPort()).toString();
+        return url;
+    }
+
+    @Override
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
         if (this.enabled) {
@@ -138,11 +138,11 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
             }
             loadBalancingDataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Successfully Published ********  
thrift event: [stream] %s [version] %s",
+                log.debug(String.format("Successfully Published thrift event: 
[stream] %s [version] %s",
                         streamDefinition.getName(), 
streamDefinition.getVersion()));
             }
-            
-            
+
+
         } catch (AgentException e) {
             if (log.isErrorEnabled()) {
                 log.error(String.format("Could not publish thrift event: 
[stream] %s [version] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index ea1c401..d025c33 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -36,15 +36,26 @@ import java.util.List;
 public class WSO2CEPHealthStatisticsPublisher extends 
ThriftStatisticsPublisher implements HealthStatisticsPublisher {
 
     private static final Log log = 
LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
-
+    private static volatile WSO2CEPHealthStatisticsPublisher 
wso2CEPHealthStatisticsPublisher;
     private static final String DATA_STREAM_NAME = 
"cartridge_agent_health_stats";
     private static final String VERSION = "1.0.0";
     private static final String CEP_THRIFT_CLIENT_NAME = "cep";
 
-    public WSO2CEPHealthStatisticsPublisher() {
+    private WSO2CEPHealthStatisticsPublisher() {
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
+    public static WSO2CEPHealthStatisticsPublisher getInstance() {
+        if (wso2CEPHealthStatisticsPublisher == null) {
+            synchronized (WSO2CEPHealthStatisticsPublisher.class) {
+                if (wso2CEPHealthStatisticsPublisher == null) {
+                    wso2CEPHealthStatisticsPublisher = new 
WSO2CEPHealthStatisticsPublisher();
+                }
+            }
+        }
+        return wso2CEPHealthStatisticsPublisher;
+    }
+
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition

http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 24465c7..8c9189b 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -38,15 +38,26 @@ import java.util.List;
  */
 public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher 
implements InFlightRequestPublisher {
     private static final Log log = 
LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
-
+    private static volatile WSO2CEPInFlightRequestPublisher 
wso2CEPInFlightRequestPublisher;
     private static final String DATA_STREAM_NAME = "in_flight_requests";
     private static final String VERSION = "1.0.0";
     private static final String CEP_THRIFT_CLIENT_NAME = "cep";
 
-    public WSO2CEPInFlightRequestPublisher() {
+    private WSO2CEPInFlightRequestPublisher() {
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
+    public static  WSO2CEPInFlightRequestPublisher getInstance() {
+        if (wso2CEPInFlightRequestPublisher == null) {
+            synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+                if (wso2CEPInFlightRequestPublisher == null) {
+                    wso2CEPInFlightRequestPublisher = new  
WSO2CEPInFlightRequestPublisher();
+                }
+            }
+        }
+        return wso2CEPInFlightRequestPublisher;
+    }
+
     private static StreamDefinition createStreamDefinition() {
         try {
             // Create stream definition

Reply via email to