Refactroing thrift publisher classes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/af13aeba Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/af13aeba Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/af13aeba Branch: refs/heads/stratos-4.1.x Commit: af13aebae4b78c9ba4df0286eb081d48794fc427 Parents: 962ce94 Author: Thanuja <[email protected]> Authored: Mon Nov 23 11:23:15 2015 +0530 Committer: Thanuja <[email protected]> Committed: Mon Nov 23 11:28:38 2015 +0530 ---------------------------------------------------------------------- .../publisher/AutoscalerPublisherFactory.java | 5 +- .../publisher/DASScalingDecisionPublisher.java | 20 ++++- .../autoscaler/util/AutoscalerConstants.java | 1 + .../CloudControllerPublisherFactory.java | 9 +- .../DASMemberInformationPublisher.java | 18 +++- .../publisher/DASMemberStatusPublisher.java | 17 +++- .../util/CloudControllerConstants.java | 1 + ...InvalidStatisticsPublisherTypeException.java | 30 +++++++ .../HealthStatisticsPublisherFactory.java | 5 +- .../InFlightRequestPublisherFactory.java | 5 +- .../publisher/ThriftStatisticsPublisher.java | 92 ++++++++++---------- .../cep/WSO2CEPHealthStatisticsPublisher.java | 15 +++- .../cep/WSO2CEPInFlightRequestPublisher.java | 15 +++- 13 files changed, 165 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java index d057108..8c688ba 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java @@ -19,6 +19,7 @@ package org.apache.stratos.autoscaler.statistics.publisher; +import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException; import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; /** @@ -29,9 +30,9 @@ public class AutoscalerPublisherFactory { public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) { if (type == StatisticsPublisherType.WSO2DAS) { - return new DASScalingDecisionPublisher(); + return DASScalingDecisionPublisher.getInstance(); } else { - throw new RuntimeException("Unknown statistics publisher type"); + throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher."); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java index a907043..097c568 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java @@ -38,22 +38,36 @@ import java.util.concurrent.ExecutorService; public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher { private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class); + private static volatile DASScalingDecisionPublisher dasScalingDecisionPublisher; private static final String DATA_STREAM_NAME = "scaling_decision"; private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10; private ExecutorService executorService; public DASScalingDecisionPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10); + executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID, + STATS_PUBLISHER_THREAD_POOL_SIZE); + } + + public static DASScalingDecisionPublisher getInstance() { + if (dasScalingDecisionPublisher == null) { + synchronized (DASScalingDecisionPublisher.class) { + if (dasScalingDecisionPublisher == null) { + dasScalingDecisionPublisher = new DASScalingDecisionPublisher(); + } + } + } + return dasScalingDecisionPublisher; } private static StreamDefinition createStreamDefinition() { try { // Create stream definition StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); - streamDefinition.setNickName("Member Information"); - streamDefinition.setDescription("Member Information"); + streamDefinition.setNickName("Scaling Decision"); + streamDefinition.setDescription("Scaling Decision"); List<Attribute> payloadData = new ArrayList<Attribute>(); // Set payload definition http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java index 997ab0c..ef12983 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java @@ -72,6 +72,7 @@ public final class AutoscalerConstants { public static final String PAYLOAD_DEPLOYMENT = "default"; public static final String MONITOR_THREAD_POOL_ID = "monitor.thread.pool"; + public static final String STATS_PUBLISHER_THREAD_POOL_ID = "autoscaler.stats.publisher.thread.pool"; public static final String MONITOR_THREAD_POOL_SIZE = "monitor.thread.pool.size"; public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler"; public static final String MEMBER_FAULT_EVENT_NAME = "member_fault"; http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java index db68396..87d7ab9 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java @@ -19,6 +19,7 @@ package org.apache.stratos.cloud.controller.statistics.publisher; +import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException; import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; /** @@ -33,9 +34,9 @@ public class CloudControllerPublisherFactory { */ public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) { if (type == StatisticsPublisherType.WSO2DAS) { - return new DASMemberInformationPublisher(); + return DASMemberInformationPublisher.getInstance(); } else { - throw new RuntimeException("Unknown statistics publisher type"); + throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher."); } } @@ -47,9 +48,9 @@ public class CloudControllerPublisherFactory { */ public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) { if (type == StatisticsPublisherType.WSO2DAS) { - return new DASMemberStatusPublisher(); + return DASMemberStatusPublisher.getInstance(); } else { - throw new RuntimeException("Unknown statistics publisher type"); + throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher."); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java index d0dcc49..4ab65e1 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java @@ -44,16 +44,28 @@ import java.util.concurrent.ExecutorService; public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher { private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class); - + private static volatile DASMemberInformationPublisher dasMemberInformationPublisher; private static final String DATA_STREAM_NAME = "member_info"; private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10; private static final String VALUE_NOT_FOUND = "Value Not Found"; private ExecutorService executorService; - public DASMemberInformationPublisher() { + private DASMemberInformationPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10); + executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE); + } + + public static DASMemberInformationPublisher getInstance() { + if (dasMemberInformationPublisher == null) { + synchronized (DASMemberInformationPublisher.class) { + if (dasMemberInformationPublisher == null) { + dasMemberInformationPublisher = new DASMemberInformationPublisher(); + } + } + } + return dasMemberInformationPublisher; } private static StreamDefinition createStreamDefinition() { http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java index 877256d..332bbba 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java @@ -38,14 +38,27 @@ import java.util.concurrent.ExecutorService; public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher { private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class); + private static volatile DASMemberStatusPublisher dasMemberStatusPublisher; private static final String DATA_STREAM_NAME = "member_lifecycle"; private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10; private ExecutorService executorService; - public DASMemberStatusPublisher() { + private DASMemberStatusPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10); + executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, STATS_PUBLISHER_THREAD_POOL_SIZE); + } + + public static DASMemberStatusPublisher getInstance() { + if (dasMemberStatusPublisher == null) { + synchronized (DASMemberStatusPublisher.class) { + if (dasMemberStatusPublisher == null) { + dasMemberStatusPublisher = new DASMemberStatusPublisher(); + } + } + } + return dasMemberStatusPublisher; } private static StreamDefinition createStreamDefinition() { http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java index c025bb4..ccd8d34 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java @@ -162,6 +162,7 @@ public final class CloudControllerConstants { public static final String TIMESTAMP_COL = "timestamp"; public static final String SCALING_DECISION_ID_COL = "scaling_decision_id"; + public static final String STATS_PUBLISHER_THREAD_POOL_ID = "cloud.controller.stats.publisher.thread.pool"; /** * Properties http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java new file mode 100644 index 0000000..09efa1e --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/exception/InvalidStatisticsPublisherTypeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.exception; + +/** + * This exception will be thrown when trying to create a publisher with invalid statistics publisher type. + */ +public class InvalidStatisticsPublisherTypeException extends Exception { + + public InvalidStatisticsPublisherTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java index e4047ab..bf67c1b 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisherFactory.java @@ -19,6 +19,7 @@ package org.apache.stratos.common.statistics.publisher; +import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException; import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPHealthStatisticsPublisher; /** @@ -28,9 +29,9 @@ public class HealthStatisticsPublisherFactory { public static HealthStatisticsPublisher createHealthStatisticsPublisher(StatisticsPublisherType type) { if (type == StatisticsPublisherType.WSO2CEP) { - return new WSO2CEPHealthStatisticsPublisher(); + return WSO2CEPHealthStatisticsPublisher.getInstance(); } else { - throw new RuntimeException("Unknown statistics publisher type"); + throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher."); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java index c942bce..a4b9db8 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisherFactory.java @@ -19,6 +19,7 @@ package org.apache.stratos.common.statistics.publisher; +import org.apache.stratos.common.exception.InvalidStatisticsPublisherTypeException; import org.apache.stratos.common.statistics.publisher.wso2.cep.WSO2CEPInFlightRequestPublisher; /** @@ -28,9 +29,9 @@ public class InFlightRequestPublisherFactory { public static InFlightRequestPublisher createInFlightRequestPublisher(StatisticsPublisherType type) { if (type == StatisticsPublisherType.WSO2CEP) { - return new WSO2CEPInFlightRequestPublisher(); + return WSO2CEPInFlightRequestPublisher.getInstance(); } else { - throw new RuntimeException("Unknown statistics publisher type"); + throw new InvalidStatisticsPublisherTypeException("Invalid statistics publisher type is used to create publisher."); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java index 7d4aa6e..16dba16 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java @@ -21,14 +21,10 @@ package org.apache.stratos.common.statistics.publisher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.wso2.carbon.databridge.agent.thrift.Agent; -import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; -import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder; import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher; import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup; -import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil; import org.wso2.carbon.databridge.commons.Event; import org.wso2.carbon.databridge.commons.StreamDefinition; @@ -52,8 +48,8 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher { * Credential information stored inside thrift-client-config.xml file * is parsed and assigned into ip,port,username and password fields * - * @param streamDefinition Thrift Event Stream Definition - * @param thriftClientName Thrift Client Name + * @param streamDefinition Thrift Event Stream Definition + * @param thriftClientName Thrift Client Name */ public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String thriftClientName) { ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance(); @@ -61,54 +57,58 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher { this.streamDefinition = streamDefinition; if (isPublisherEnabled()) { - this.enabled = true; + this.enabled = true; init(); } } private boolean isPublisherEnabled() { - boolean publisherEnabled = false; - for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) { - publisherEnabled = thriftClientInfo.isStatsPublisherEnabled(); - if(publisherEnabled){ - break; - } - } - return publisherEnabled; - } - - private void init() { - + boolean publisherEnabled = false; + for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) { + publisherEnabled = thriftClientInfo.isStatsPublisherEnabled(); + if (publisherEnabled) { + break; + } + } + return publisherEnabled; + } + + private void init() { + // Initialize load balancing data publisher loadBalancingDataPublisher = new LoadBalancingDataPublisher(getReceiverGroups()); - loadBalancingDataPublisher.addStreamDefinition(streamDefinition); + + //adding stream definition + if (!loadBalancingDataPublisher.isStreamDefinitionAdded(streamDefinition)) { + loadBalancingDataPublisher.addStreamDefinition(streamDefinition); + } } private ArrayList<ReceiverGroup> getReceiverGroups() { - - ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>(); - + + ArrayList<ReceiverGroup> receiverGroups = new ArrayList<ReceiverGroup>(); + for (ThriftClientInfo thriftClientInfo : thriftClientInfoList) { - ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>(); - DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword()); - dataPublisherHolders.add(aNode); - ReceiverGroup group = new ReceiverGroup(dataPublisherHolders); - receiverGroups.add(group); - } - return receiverGroups; - - } - - private String buildUrl(ThriftClientInfo thriftClientInfo) { - String url = new StringBuilder() - .append("tcp://") - .append(thriftClientInfo.getIp()) - .append(":") - .append(thriftClientInfo.getPort()).toString(); - return url; - } - - @Override + ArrayList<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>(); + DataPublisherHolder aNode = new DataPublisherHolder(null, buildUrl(thriftClientInfo), thriftClientInfo.getUsername(), thriftClientInfo.getPassword()); + dataPublisherHolders.add(aNode); + ReceiverGroup group = new ReceiverGroup(dataPublisherHolders); + receiverGroups.add(group); + } + return receiverGroups; + + } + + private String buildUrl(ThriftClientInfo thriftClientInfo) { + String url = new StringBuilder() + .append("tcp://") + .append(thriftClientInfo.getIp()) + .append(":") + .append(thriftClientInfo.getPort()).toString(); + return url; + } + + @Override public void setEnabled(boolean enabled) { this.enabled = enabled; if (this.enabled) { @@ -138,11 +138,11 @@ public class ThriftStatisticsPublisher implements StatisticsPublisher { } loadBalancingDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); if (log.isDebugEnabled()) { - log.debug(String.format("Successfully Published ******** thrift event: [stream] %s [version] %s", + log.debug(String.format("Successfully Published thrift event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); } - - + + } catch (AgentException e) { if (log.isErrorEnabled()) { log.error(String.format("Could not publish thrift event: [stream] %s [version] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java index ea1c401..d025c33 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java @@ -36,15 +36,26 @@ import java.util.List; public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher { private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class); - + private static volatile WSO2CEPHealthStatisticsPublisher wso2CEPHealthStatisticsPublisher; private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats"; private static final String VERSION = "1.0.0"; private static final String CEP_THRIFT_CLIENT_NAME = "cep"; - public WSO2CEPHealthStatisticsPublisher() { + private WSO2CEPHealthStatisticsPublisher() { super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME); } + public static WSO2CEPHealthStatisticsPublisher getInstance() { + if (wso2CEPHealthStatisticsPublisher == null) { + synchronized (WSO2CEPHealthStatisticsPublisher.class) { + if (wso2CEPHealthStatisticsPublisher == null) { + wso2CEPHealthStatisticsPublisher = new WSO2CEPHealthStatisticsPublisher(); + } + } + } + return wso2CEPHealthStatisticsPublisher; + } + private static StreamDefinition createStreamDefinition() { try { // Create stream definition http://git-wip-us.apache.org/repos/asf/stratos/blob/af13aeba/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java index 24465c7..8c9189b 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java @@ -38,15 +38,26 @@ import java.util.List; */ public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher { private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class); - + private static volatile WSO2CEPInFlightRequestPublisher wso2CEPInFlightRequestPublisher; private static final String DATA_STREAM_NAME = "in_flight_requests"; private static final String VERSION = "1.0.0"; private static final String CEP_THRIFT_CLIENT_NAME = "cep"; - public WSO2CEPInFlightRequestPublisher() { + private WSO2CEPInFlightRequestPublisher() { super(createStreamDefinition(), CEP_THRIFT_CLIENT_NAME); } + public static WSO2CEPInFlightRequestPublisher getInstance() { + if (wso2CEPInFlightRequestPublisher == null) { + synchronized ( WSO2CEPInFlightRequestPublisher.class) { + if (wso2CEPInFlightRequestPublisher == null) { + wso2CEPInFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher(); + } + } + } + return wso2CEPInFlightRequestPublisher; + } + private static StreamDefinition createStreamDefinition() { try { // Create stream definition
