Changing publisher classes hierarchy

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/609bc9d9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/609bc9d9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/609bc9d9

Branch: refs/heads/stratos-4.1.x
Commit: 609bc9d9013fc3fb13d5b4a467528eb70540dccf
Parents: f3a809b
Author: Thanuja <[email protected]>
Authored: Mon Nov 23 18:24:30 2015 +0530
Committer: Thanuja <[email protected]>
Committed: Mon Nov 23 18:24:30 2015 +0530

----------------------------------------------------------------------
 .../publisher/DASScalingDecisionPublisher.java  |  5 ++--
 .../publisher/ScalingDecisionPublisher.java     | 24 ++++++++++++--------
 .../DASMemberInformationPublisher.java          |  8 +++----
 .../publisher/DASMemberStatusPublisher.java     |  8 +++----
 .../publisher/MemberInformationPublisher.java   | 12 +++++++---
 .../publisher/MemberStatusPublisher.java        | 16 +++++++++----
 ...InvalidStatisticsPublisherTypeException.java |  2 +-
 .../publisher/HealthStatisticsPublisher.java    | 12 +++++++---
 .../publisher/InFlightRequestPublisher.java     | 11 +++++++--
 .../publisher/ThriftStatisticsPublisher.java    |  4 +---
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |  5 ++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    | 19 ++++++++--------
 12 files changed, 76 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index 097c568..52857d4 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -22,7 +22,6 @@ package org.apache.stratos.autoscaler.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher 
implements ScalingDecisionPublisher {
+public class DASScalingDecisionPublisher extends ScalingDecisionPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASScalingDecisionPublisher.class);
     private static volatile DASScalingDecisionPublisher 
dasScalingDecisionPublisher;
@@ -165,7 +164,7 @@ public class DASScalingDecisionPublisher extends 
ThriftStatisticsPublisher imple
                 payload.add(activeInstanceCount);
                 payload.add(additionalInstanceCount);
                 payload.add(scalingReason);
-                DASScalingDecisionPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
 
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
index f7b0087..fe791f9 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.autoscaler.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Scaling Decision Publisher interface.
  */
-public interface ScalingDecisionPublisher extends StatisticsPublisher {
+public abstract class ScalingDecisionPublisher extends 
ThriftStatisticsPublisher {
+
+    public ScalingDecisionPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing scaling decision to DAS.
      *
@@ -47,11 +53,11 @@ public interface ScalingDecisionPublisher extends 
StatisticsPublisher {
      * @param additionalInstanceCount Additional Instance Needed
      * @param scalingReason           Scaling Reason
      */
-    public void publish(Long timestamp, String scalingDecisionId, String 
clusterId,
-                        int minInstanceCount, int maxInstanceCount,
-                        int rifPredicted, int rifThreshold, int 
rifRequiredInstances,
-                        int mcPredicted, int mcThreshold, int 
mcRequiredInstances,
-                        int laPredicted, int laThreshold, int 
laRequiredInstance,
-                        int requiredInstanceCount, int activeInstanceCount, 
int additionalInstanceCount,
-                        String scalingReason);
+    public abstract void publish(Long timestamp, String scalingDecisionId, 
String clusterId,
+                                 int minInstanceCount, int maxInstanceCount,
+                                 int rifPredicted, int rifThreshold, int 
rifRequiredInstances,
+                                 int mcPredicted, int mcThreshold, int 
mcRequiredInstances,
+                                 int laPredicted, int laThreshold, int 
laRequiredInstance,
+                                 int requiredInstanceCount, int 
activeInstanceCount, int additionalInstanceCount,
+                                 String scalingReason);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 4ab65e1..621f9e2 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -27,7 +27,6 @@ import 
org.apache.stratos.cloud.controller.domain.IaasProvider;
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
 import org.apache.stratos.cloud.controller.domain.MemberContext;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -41,7 +40,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
  */
-public class DASMemberInformationPublisher extends ThriftStatisticsPublisher 
implements MemberInformationPublisher {
+public class DASMemberInformationPublisher extends MemberInformationPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASMemberInformationPublisher.class);
     private static volatile DASMemberInformationPublisher 
dasMemberInformationPublisher;
@@ -54,7 +53,8 @@ public class DASMemberInformationPublisher extends 
ThriftStatisticsPublisher imp
 
     private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
 STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberInformationPublisher getInstance() {
@@ -158,7 +158,7 @@ public class DASMemberInformationPublisher extends 
ThriftStatisticsPublisher imp
                                 metadata.getOperatingSystemName(), 
metadata.getOperatingSystemVersion(),
                                 metadata.getOperatingSystemArchitecture(), 
metadata.isOperatingSystem64bit()));
                     }
-                    
DASMemberInformationPublisher.super.publish(payload.toArray());
+                    publish(payload.toArray());
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 332bbba..7a291ab 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -22,7 +22,6 @@ package 
org.apache.stratos.cloud.controller.statistics.publisher;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 /**
  * Publishing member status to DAS.
  */
-public class DASMemberStatusPublisher extends ThriftStatisticsPublisher 
implements MemberStatusPublisher {
+public class DASMemberStatusPublisher extends MemberStatusPublisher {
 
     private static final Log log = 
LogFactory.getLog(DASMemberStatusPublisher.class);
     private static volatile DASMemberStatusPublisher dasMemberStatusPublisher;
@@ -47,7 +46,8 @@ public class DASMemberStatusPublisher extends 
ThriftStatisticsPublisher implemen
 
     private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
 STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberStatusPublisher getInstance() {
@@ -131,7 +131,7 @@ public class DASMemberStatusPublisher extends 
ThriftStatisticsPublisher implemen
                 payload.add(partitionId);
                 payload.add(memberId);
                 payload.add(status);
-                DASMemberStatusPublisher.super.publish(payload.toArray());
+                publish(payload.toArray());
             }
         };
         executorService.execute(publisher);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
index ffe0380..fda1b41 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberInformationPublisher.java
@@ -20,12 +20,18 @@
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
 import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Information Publisher interface.
  */
-public interface MemberInformationPublisher extends StatisticsPublisher {
+public abstract class MemberInformationPublisher extends 
ThriftStatisticsPublisher {
+
+    public MemberInformationPublisher(StreamDefinition streamDefinition, 
String thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member information.
      *
@@ -33,6 +39,6 @@ public interface MemberInformationPublisher extends 
StatisticsPublisher {
      * @param scalingDecisionId Scaling Decision Id
      * @param metadata          InstanceMetadata
      */
-    public void publish(String memberId, String scalingDecisionId, 
InstanceMetadata metadata);
+    public abstract void publish(String memberId, String scalingDecisionId, 
InstanceMetadata metadata);
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
index fad1006..4fa23b1 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/MemberStatusPublisher.java
@@ -19,12 +19,18 @@
 
 package org.apache.stratos.cloud.controller.statistics.publisher;
 
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 /**
  * Member Status Publisher Interface.
  */
-public interface MemberStatusPublisher extends StatisticsPublisher {
+public abstract class MemberStatusPublisher extends ThriftStatisticsPublisher {
+
+    public MemberStatusPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
+
     /**
      * Publishing member status.
      *
@@ -39,7 +45,7 @@ public interface MemberStatusPublisher extends 
StatisticsPublisher {
      * @param memberId           Member Id
      * @param status             Member Status
      */
-    void publish(Long timestamp, String applicationId, String clusterId,
-                 String clusterAlias, String clusterInstanceId, String 
serviceName,
-                 String networkPartitionId, String partitionId, String 
memberId, String status);
+    public abstract void publish(Long timestamp, String applicationId, String 
clusterId,
+                                 String clusterAlias, String 
clusterInstanceId, String serviceName,
+                                 String networkPartitionId, String 
partitionId, String memberId, String status);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
index 09efa1e..4609c9f 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java
@@ -22,7 +22,7 @@ package org.apache.stratos.common.exception;
 /**
  * This exception will be thrown when trying to create a publisher with 
invalid statistics publisher type.
  */
-public class InvalidStatisticsPublisherTypeException extends Exception {
+public class InvalidStatisticsPublisherTypeException extends RuntimeException {
 
     public InvalidStatisticsPublisherTypeException(String message) {
         super(message);

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..20f0ffe 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * Health statistics publisher interface.
  */
-public interface HealthStatisticsPublisher extends StatisticsPublisher {
+public abstract class HealthStatisticsPublisher extends 
ThriftStatisticsPublisher {
+
+    public HealthStatisticsPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish health statistics to complex event processor.
@@ -35,6 +41,6 @@ public interface HealthStatisticsPublisher extends 
StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId,
-                 String memberId, String partitionId, String health, double 
value);
+    public abstract void publish(String clusterId, String clusterInstanceId, 
String networkPartitionId,
+                                 String memberId, String partitionId, String 
health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af46ed1 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -19,10 +19,16 @@
 
 package org.apache.stratos.common.statistics.publisher;
 
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
 /**
  * In-flight request publisher interface.
  */
-public interface InFlightRequestPublisher extends StatisticsPublisher {
+public abstract class InFlightRequestPublisher extends 
ThriftStatisticsPublisher {
+
+    public InFlightRequestPublisher(StreamDefinition streamDefinition, String 
thriftClientName) {
+        super(streamDefinition, thriftClientName);
+    }
 
     /**
      * Publish in-flight request count.
@@ -32,5 +38,6 @@ public interface InFlightRequestPublisher extends 
StatisticsPublisher {
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId, int inFlightRequestCount);
+    public abstract void publish(String clusterId, String clusterInstanceId, 
String networkPartitionId,
+                                 int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
index 4552f92..95c0478 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -77,9 +77,7 @@ public class ThriftStatisticsPublisher implements 
StatisticsPublisher {
         loadBalancingDataPublisher = new 
LoadBalancingDataPublisher(getReceiverGroups());
 
         //adding stream definition
-        if 
(!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
-            loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
-        }
+        loadBalancingDataPublisher.addStreamDefinition(streamDefinition);
     }
 
     private ArrayList<ReceiverGroup> getReceiverGroups() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d025c33..03222ec 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,7 +22,6 @@ package 
org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
-import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,7 +32,7 @@ import java.util.List;
 /**
  * Health statistics publisher for publishing statistics to WSO2 CEP.
  */
-public class WSO2CEPHealthStatisticsPublisher extends 
ThriftStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends 
HealthStatisticsPublisher {
 
     private static final Log log = 
LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
     private static volatile WSO2CEPHealthStatisticsPublisher 
wso2CEPHealthStatisticsPublisher;
@@ -109,6 +108,6 @@ public class WSO2CEPHealthStatisticsPublisher extends 
ThriftStatisticsPublisher
         payload.add(health);
         payload.add(value);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/609bc9d9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 8c9189b..862a49d 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -22,7 +22,6 @@ package 
org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
-import 
org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -36,7 +35,7 @@ import java.util.List;
  * In-flight request count:
  * Number of requests being served at a given moment could be identified as 
in-flight request count.
  */
-public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher 
implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends InFlightRequestPublisher {
     private static final Log log = 
LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
     private static volatile WSO2CEPInFlightRequestPublisher 
wso2CEPInFlightRequestPublisher;
     private static final String DATA_STREAM_NAME = "in_flight_requests";
@@ -47,11 +46,11 @@ public class WSO2CEPInFlightRequestPublisher extends 
ThriftStatisticsPublisher i
         super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME);
     }
 
-    public static  WSO2CEPInFlightRequestPublisher getInstance() {
+    public static WSO2CEPInFlightRequestPublisher getInstance() {
         if (wso2CEPInFlightRequestPublisher == null) {
-            synchronized ( WSO2CEPInFlightRequestPublisher.class) {
+            synchronized (WSO2CEPInFlightRequestPublisher.class) {
                 if (wso2CEPInFlightRequestPublisher == null) {
-                    wso2CEPInFlightRequestPublisher = new  
WSO2CEPInFlightRequestPublisher();
+                    wso2CEPInFlightRequestPublisher = new 
WSO2CEPInFlightRequestPublisher();
                 }
             }
         }
@@ -81,10 +80,10 @@ public class WSO2CEPInFlightRequestPublisher extends 
ThriftStatisticsPublisher i
     /**
      * Publish in-flight request count of a cluster.
      *
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
+     * @param clusterId             Cluster id
+     * @param clusterInstanceId     Cluster instance id
+     * @param networkPartitionId    Cluster's network partition id
+     * @param inFlightRequestCount  Cluster's in-flight-request count
      */
     @Override
     public void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId,
@@ -102,6 +101,6 @@ public class WSO2CEPInFlightRequestPublisher extends 
ThriftStatisticsPublisher i
         payload.add(networkPartitionId);
         payload.add((double) inFlightRequestCount);
 
-        super.publish(payload.toArray());
+        publish(payload.toArray());
     }
 }

Reply via email to