Publishing metering service event streams
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b38e27c9 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b38e27c9 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b38e27c9 Branch: refs/heads/stratos-4.1.x Commit: b38e27c9adfa5e28d62e8ba842118e0cf6daafcd Parents: a34decc Author: Thanuja <[email protected]> Authored: Tue Sep 22 22:25:24 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Mon Sep 28 18:44:17 2015 +0530 ---------------------------------------------------------------------- .../client/AutoscalerCloudControllerClient.java | 9 +- .../monitor/cluster/ClusterMonitor.java | 16 +- .../autoscaler/rule/RuleTasksDelegator.java | 5 +- .../publisher/AutoscalerPublisherFactory.java | 37 +++ .../publisher/DASScalingDecisionPublisher.java | 161 +++++++++++++ .../publisher/ScalingDecisionPublisher.java | 57 +++++ .../autoscaler/util/AutoscalerConstants.java | 19 ++ .../messaging/topology/TopologyBuilder.java | 227 +++++++++++++++---- .../impl/CloudControllerServiceUtil.java | 13 +- .../services/impl/InstanceCreator.java | 13 -- .../publisher/BAMUsageDataPublisher.java | 213 ----------------- .../CloudControllerPublisherFactory.java | 55 +++++ .../DASMemberInformationPublisher.java | 161 +++++++++++++ .../publisher/DASMemberStatusPublisher.java | 127 +++++++++++ .../publisher/MemberInformationPublisher.java | 38 ++++ .../publisher/MemberStatusPublisher.java | 45 ++++ .../util/CloudControllerConstants.java | 32 +++ .../controller/util/CloudControllerUtil.java | 16 +- .../common/client/AutoscalerServiceClient.java | 20 +- .../common/constants/StratosConstants.java | 1 + .../src/main/conf/drools/dependent-scaling.drl | 21 +- .../src/main/conf/drools/mincheck.drl | 22 +- .../src/main/conf/drools/scaling.drl | 24 +- .../src/test/resources/common/scaling.drl | 25 +- .../resources/common/thrift-client-config.xml | 22 +- 25 files changed, 1064 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java index 0124206..9f380d0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java @@ -75,8 +75,8 @@ public class AutoscalerCloudControllerClient { public synchronized MemberContext startInstance(PartitionRef partition, String clusterId, String clusterInstanceId, - String networkPartitionId, - int minMemberCount) throws SpawningException { + String networkPartitionId, int minMemberCount, + String scalingDecisionId) throws SpawningException { try { if (log.isInfoEnabled()) { log.info(String.format("Trying to spawn an instance via cloud controller: " + @@ -102,8 +102,11 @@ public class AutoscalerCloudControllerClient { Property minCountProp = new Property(); minCountProp.setName(StratosConstants.MIN_COUNT); minCountProp.setValue(String.valueOf(minMemberCount)); - memberContextProps.addProperty(minCountProp); + Property scalingDecisionIdProp = new Property(); + scalingDecisionIdProp.setName(StratosConstants.SCALING_DECISION_ID); + scalingDecisionIdProp.setValue(String.valueOf(scalingDecisionId)); + memberContextProps.addProperty(scalingDecisionIdProp); instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps)); long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index 43493bd..aadad54 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -45,12 +45,10 @@ import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiv import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor; import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor; import org.apache.stratos.autoscaler.util.AutoscalerConstants; -import org.apache.stratos.autoscaler.util.AutoscalerObjectConverter; import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; -import org.apache.stratos.common.Property; import org.apache.stratos.common.client.CloudControllerServiceClient; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.common.threading.StratosThreadPool; @@ -328,8 +326,8 @@ public class ClusterMonitor extends Monitor { public void run() { if (log.isDebugEnabled()) { - log.debug(String.format("Cluster monitor is running: [application-id] %s [cluster-id]: " + - "%s", getAppId(), getClusterId())); + log.debug(String.format("Cluster monitor is running: [application-id] " + + "%s [cluster-id]: %s", getAppId(), getClusterId())); } instanceContext.getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); @@ -698,8 +696,8 @@ public class ClusterMonitor extends Monitor { Float floatValue = averageRequestsServingCapabilityEvent.getValue(); if (log.isDebugEnabled()) { - log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s", - clusterId, networkPartitionId, floatValue)); + log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s " + + "[value] %s", clusterId, networkPartitionId, floatValue)); } ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( @@ -1231,7 +1229,8 @@ public class ClusterMonitor extends Monitor { // active members if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) { - log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId())); + log.info(String.format("Terminating all remaining members of partition [partition-id] %s" + + " [application-id] %s", partitionContext.getPartitionId(), getAppId())); partitionContext.terminateAllRemainingInstances(); } @@ -1301,7 +1300,8 @@ public class ClusterMonitor extends Monitor { } if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) { - log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId())); + log.info(String.format("Terminating all remaining members of partition [partition-id] %s " + + "[application-id] %s", partitionContext.getPartitionId(), getAppId())); partitionContext.terminateAllRemainingInstances(); } //Need to terminate pending members http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 1a8bfc9..132306a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -171,9 +171,10 @@ public class RuleTasksDelegator { * @param clusterMonitorPartitionContext Cluster monitor partition context * @param clusterId Cluster id * @param clusterInstanceId Instance id + * @param scalingDecisionId Scaling Decision id */ public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId, - String clusterInstanceId) { + String clusterInstanceId, String scalingDecisionId) { try { String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId(); @@ -194,7 +195,7 @@ public class RuleTasksDelegator { .startInstance(clusterMonitorPartitionContext.getPartition(), clusterId, clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(), - minimumCountOfNetworkPartition); + minimumCountOfNetworkPartition, scalingDecisionId); if (memberContext != null) { ClusterLevelPartitionContext partitionContext = clusterInstanceContext. getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java new file mode 100644 index 0000000..d057108 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.statistics.publisher; + +import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; + +/** + * Creating ScalingDecisionPublisher. + */ +public class AutoscalerPublisherFactory { + + public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) { + + if (type == StatisticsPublisherType.WSO2DAS) { + return new DASScalingDecisionPublisher(); + } else { + throw new RuntimeException("Unknown statistics publisher type"); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java new file mode 100644 index 0000000..a907043 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.statistics.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.util.AutoscalerConstants; +import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * MemberInfoPublisher to publish member information/metadata to DAS. + */ +public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher { + + private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class); + private static final String DATA_STREAM_NAME = "scaling_decision"; + private static final String VERSION = "1.0.0"; + private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private ExecutorService executorService; + + public DASScalingDecisionPublisher() { + super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); + executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10); + } + + private static StreamDefinition createStreamDefinition() { + try { + // Create stream definition + StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); + streamDefinition.setNickName("Member Information"); + streamDefinition.setDescription("Member Information"); + List<Attribute> payloadData = new ArrayList<Attribute>(); + + // Set payload definition + payloadData.add(new Attribute(AutoscalerConstants.TIMESTAMP, AttributeType.LONG)); + payloadData.add(new Attribute(AutoscalerConstants.SCALING_DECISION_ID, AttributeType.STRING)); + payloadData.add(new Attribute(AutoscalerConstants.CLUSTER_ID, AttributeType.STRING)); + payloadData.add(new Attribute(AutoscalerConstants.MIN_INSTANCE_COUNT, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.MAX_INSTANCE_COUNT, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.RIF_PREDICTED, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.RIF_THRESHOLD, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.RIF_REQUIRED_INSTANCES, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.MC_PREDICTED, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.MC_THRESHOLD, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.MC_REQUIRED_INSTANCES, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.LA_PREDICTED, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.LA_THRESHOLD, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.LA_REQUIRED_INSTANCES, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.REQUIRED_INSTANCE_COUNT, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.ACTIVE_INSTANCE_COUNT, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.ADDITIONAL_INSTANCE_COUNT, AttributeType.INT)); + payloadData.add(new Attribute(AutoscalerConstants.SCALING_REASON, AttributeType.STRING)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } catch (Exception e) { + throw new RuntimeException("Could not create stream definition", e); + } + } + + /** + * Publishing scaling decision to DAS. + * + * @param timestamp Scaling Time + * @param scalingDecisionId Scaling Decision Id + * @param clusterId Cluster Id + * @param minInstanceCount Minimum Instance Count + * @param maxInstanceCount Maximum Instance Count + * @param rifPredicted RIF Predicted + * @param rifThreshold RIF Threshold + * @param rifRequiredInstances RIF Required Instances + * @param mcPredicted MC Predicted + * @param mcThreshold MC Threshold + * @param mcRequiredInstances MC Required Instances + * @param laPredicted LA Predicted + * @param laThreshold LA Threshold + * @param laRequiredInstance LA Required Instance + * @param requiredInstanceCount Required Instance Count + * @param activeInstanceCount Active Instance Count + * @param additionalInstanceCount Additional Instance Needed + * @param scalingReason Scaling Reason + */ + @Override + public void publish(final Long timestamp, final String scalingDecisionId, final String clusterId, + final int minInstanceCount, final int maxInstanceCount, + final int rifPredicted, final int rifThreshold, final int rifRequiredInstances, + final int mcPredicted, final int mcThreshold, final int mcRequiredInstances, + final int laPredicted, final int laThreshold, final int laRequiredInstance, + final int requiredInstanceCount, final int activeInstanceCount, + final int additionalInstanceCount, final String scalingReason) { + Runnable publisher = new Runnable() { + @Override + public void run() { + if (log.isDebugEnabled()) + + { + log.debug(String.format("Publishing scaling decision: [timestamp] %d [scaling_decision_id] %s " + + "[cluster_id] %s [min_instance_count] %d [max_instance_count] %d " + + "[rif_predicted] %d [rif_threshold] %d [rif_required_instances] %d " + + "[mc_predicted] %d [mc_threshold] %d [mc_required_instances] %d " + + "[la_predicted] %d [la_threshold] %d [la_required_instances] %d " + + "[required_instance_count] %d [active_instance_count] %d " + + "[addtitional_instance_count] %d [scaling_reason] %s", + timestamp, scalingDecisionId, clusterId, minInstanceCount, maxInstanceCount, rifPredicted, + rifThreshold, rifRequiredInstances, mcPredicted, mcThreshold, mcRequiredInstances, + laPredicted, laThreshold, laRequiredInstance, requiredInstanceCount, activeInstanceCount, + additionalInstanceCount, scalingReason)); + } + + //adding payload data + List<Object> payload = new ArrayList<Object>(); + payload.add(timestamp); + payload.add(scalingDecisionId); + payload.add(clusterId); + payload.add(minInstanceCount); + payload.add(maxInstanceCount); + payload.add(rifPredicted); + payload.add(rifThreshold); + payload.add(rifRequiredInstances); + payload.add(mcPredicted); + payload.add(mcThreshold); + payload.add(mcRequiredInstances); + payload.add(laPredicted); + payload.add(laThreshold); + payload.add(laRequiredInstance); + payload.add(requiredInstanceCount); + payload.add(activeInstanceCount); + payload.add(additionalInstanceCount); + payload.add(scalingReason); + DASScalingDecisionPublisher.super.publish(payload.toArray()); + } + + }; + executorService.execute(publisher); + } +} + http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java new file mode 100644 index 0000000..f7b0087 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.statistics.publisher; + +import org.apache.stratos.common.statistics.publisher.StatisticsPublisher; + +/** + * Scaling Decision Publisher interface. + */ +public interface ScalingDecisionPublisher extends StatisticsPublisher { + /** + * Publishing scaling decision to DAS. + * + * @param timestamp Scaling Time + * @param scalingDecisionId Scaling Decision Id + * @param clusterId Cluster Id + * @param minInstanceCount Minimum Instance Count + * @param maxInstanceCount Maximum Instance Count + * @param rifPredicted RIF Predicted + * @param rifThreshold RIF Threshold + * @param rifRequiredInstances RIF Required Instances + * @param mcPredicted MC Predicted + * @param mcThreshold MC Threshold + * @param mcRequiredInstances MC Required Instances + * @param laPredicted LA Predicted + * @param laThreshold LA Threshold + * @param laRequiredInstance LA Required Instance + * @param requiredInstanceCount Required Instance Count + * @param activeInstanceCount Active Instance Count + * @param additionalInstanceCount Additional Instance Needed + * @param scalingReason Scaling Reason + */ + public void publish(Long timestamp, String scalingDecisionId, String clusterId, + int minInstanceCount, int maxInstanceCount, + int rifPredicted, int rifThreshold, int rifRequiredInstances, + int mcPredicted, int mcThreshold, int mcRequiredInstances, + int laPredicted, int laThreshold, int laRequiredInstance, + int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount, + String scalingReason); +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java index 788c79b..caea65a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java @@ -118,4 +118,23 @@ public final class AutoscalerConstants { public static final String IDENTITY_APPLICATION_SERVICE_SFX = "services/IdentityApplicationManagementService"; public static final String TOKEN_ENDPOINT_SFX = "oauth2/token"; public static final String TERMINATE_DEPENDENTS = "terminate-dependents"; + //scaling decision payload values + public static final String TIMESTAMP = "timestamp"; + public static final String SCALING_DECISION_ID = "scaling_decision_id"; + public static final String CLUSTER_ID = "cluster_id"; + public static final String MIN_INSTANCE_COUNT = "min_instance_count"; + public static final String MAX_INSTANCE_COUNT = "max_instance_count"; + public static final String RIF_PREDICTED = "rif_predicted"; + public static final String RIF_THRESHOLD = "rif_threshold"; + public static final String RIF_REQUIRED_INSTANCES = "rif_required_instances"; + public static final String MC_PREDICTED = "mc_predicted"; + public static final String MC_THRESHOLD = "mc_threshold"; + public static final String MC_REQUIRED_INSTANCES = "mc_required_instances"; + public static final String LA_PREDICTED = "la_predicted"; + public static final String LA_THRESHOLD = "la_threshold"; + public static final String LA_REQUIRED_INSTANCES = "la_required_instances"; + public static final String REQUIRED_INSTANCE_COUNT = "required_instance_count"; + public static final String ACTIVE_INSTANCE_COUNT = "active_instance_count"; + public static final String ADDITIONAL_INSTANCE_COUNT = "additional_instance_count"; + public static final String SCALING_REASON = "scaling_reason"; } http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index 936f9ec..7348b81 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -28,9 +28,13 @@ import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeExcepti import org.apache.stratos.cloud.controller.exception.InvalidMemberException; import org.apache.stratos.cloud.controller.iaases.kubernetes.KubernetesIaas; import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; +import org.apache.stratos.cloud.controller.statistics.publisher.CloudControllerPublisherFactory; +import org.apache.stratos.cloud.controller.statistics.publisher.MemberInformationPublisher; +import org.apache.stratos.cloud.controller.statistics.publisher.MemberStatusPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; import org.apache.stratos.kubernetes.client.KubernetesConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; @@ -148,7 +152,6 @@ public class TopologyBuilder { } - public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { TopologyManager.acquireWriteLock(); @@ -371,14 +374,15 @@ public class TopologyBuilder { */ public static void handleMemberCreatedEvent(MemberContext memberContext) { Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(memberContext.getCartridgeType()); String clusterId = memberContext.getClusterId(); Cluster cluster = service.getCluster(clusterId); + String applicationId = service.getCluster(memberContext.getClusterId()).getAppId(); String memberId = memberContext.getMemberId(); String clusterInstanceId = memberContext.getClusterInstanceId(); String networkPartitionId = memberContext.getNetworkPartitionId(); String partitionId = memberContext.getPartition().getId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId()); String lbClusterId = memberContext.getLbClusterId(); long initTime = memberContext.getInitTime(); @@ -396,6 +400,31 @@ public class TopologyBuilder { member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); cluster.addMember(member); TopologyManager.updateTopology(topology); + + //member created time + Long timestamp = System.currentTimeMillis(); + //publishing member status to DAS + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + memberContext.getClusterId(), + clusterAlias, + memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + memberContext.getNetworkPartitionId(), + memberContext.getPartition().getId(), + memberContext.getMemberId(), + MemberStatus.Created.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } + } finally { TopologyManager.releaseWriteLock(); } @@ -411,6 +440,7 @@ public class TopologyBuilder { public static void handleMemberInitializedEvent(MemberContext memberContext) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(memberContext.getCartridgeType()); + if (service == null) { log.warn(String.format("Service %s does not exist", memberContext.getCartridgeType())); @@ -423,6 +453,8 @@ public class TopologyBuilder { return; } + String applicationId = service.getCluster(memberContext.getClusterId()).getAppId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId()); Member member = service.getCluster(memberContext.getClusterId()). getMember(memberContext.getMemberId()); if (member == null) { @@ -464,18 +496,48 @@ public class TopologyBuilder { log.info("Member status updated to initialized"); TopologyManager.updateTopology(topology); - + //member intialized time + Long timestamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); - //publishing data - BAMUsageDataPublisher.publish(memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - null); + //publishing member information and status to DAS + MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory. + createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS); + + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + + if (memInfoPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.info("Publishing Member Information"); + } + String scalingDecisionId = memberContext.getProperties().getProperty( + StratosConstants.SCALING_DECISION_ID).getValue(); + memInfoPublisher.publish(memberContext.getMemberId(), scalingDecisionId, + memberContext.getInstanceMetadata()); + } else { + log.warn("Member Information Publisher is not enabled"); + } + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + memberContext.getClusterId(), + clusterAlias, + memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + memberContext.getNetworkPartitionId(), + memberContext.getPartition().getId(), + memberContext.getMemberId(), + MemberStatus.Initialized.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } } - } finally { + } finally + + { TopologyManager.releaseWriteLock(); } } @@ -495,6 +557,7 @@ public class TopologyBuilder { try { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceStartedEvent.getServiceName()); + if (service == null) { log.warn(String.format("Service %s does not exist", instanceStartedEvent.getServiceName())); @@ -507,6 +570,8 @@ public class TopologyBuilder { return; } + String applicationId = service.getCluster(instanceStartedEvent.getClusterId()).getAppId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceStartedEvent.getClusterId()); Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId()); Member member = cluster.getMember(instanceStartedEvent.getMemberId()); if (member == null) { @@ -527,16 +592,31 @@ public class TopologyBuilder { log.info("member started event adding status started"); TopologyManager.updateTopology(topology); + //member started time + Long timestamp = System.currentTimeMillis(); //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); - //publishing data - BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), - null); + //publishing member status to DAS + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + instanceStartedEvent.getClusterId(), + clusterAlias, + instanceStartedEvent.getClusterInstanceId(), + instanceStartedEvent.getServiceName(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getMemberId(), + MemberStatus.Starting.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } } } finally { TopologyManager.releaseWriteLock(); @@ -549,9 +629,11 @@ public class TopologyBuilder { } } + public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceActivatedEvent.getServiceName()); + if (service == null) { log.warn(String.format("Service %s does not exist", instanceActivatedEvent.getServiceName())); @@ -565,6 +647,9 @@ public class TopologyBuilder { return; } + String applicationId = service.getCluster(instanceActivatedEvent.getClusterId()).getAppId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceActivatedEvent.getClusterId()); + Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", @@ -587,7 +672,8 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); return; } else { member.setStatus(MemberStatus.Active); @@ -632,15 +718,31 @@ public class TopologyBuilder { // Publish member activated event TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); + //member activated time + Long timestamp = System.currentTimeMillis(); + // Publish member activated event + TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); - // Publish statistics data - BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Active.toString(), - null); + //publishing member status to DAS + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + memberActivatedEvent.getClusterId(), + clusterAlias, + memberActivatedEvent.getClusterInstanceId(), + memberActivatedEvent.getServiceName(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getMemberId(), + MemberStatus.Active.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } } } finally { TopologyManager.releaseWriteLock(); @@ -651,6 +753,7 @@ public class TopologyBuilder { throws InvalidMemberException, InvalidCartridgeTypeException { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); + //update the status of the member if (service == null) { log.warn(String.format("Service %s does not exist", @@ -665,6 +768,8 @@ public class TopologyBuilder { return; } + String applicationId = service.getCluster(instanceReadyToShutdownEvent.getClusterId()).getAppId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceReadyToShutdownEvent.getClusterId()); Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); if (member == null) { @@ -679,6 +784,8 @@ public class TopologyBuilder { instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), instanceReadyToShutdownEvent.getPartitionId()); + //member ReadyToShutDown state change time + Long timestamp = null; try { TopologyManager.acquireWriteLock(); @@ -691,18 +798,31 @@ public class TopologyBuilder { log.info("Member Ready to shut down event adding status started"); TopologyManager.updateTopology(topology); + timestamp = System.currentTimeMillis(); } finally { TopologyManager.releaseWriteLock(); } TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - //publishing data - BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); + //publishing member status to DAS. + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + instanceReadyToShutdownEvent.getClusterId(), + clusterAlias, + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getMemberId(), + MemberStatus.ReadyToShutDown.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } //termination of particular instance will be handled by autoscaler } @@ -786,6 +906,8 @@ public class TopologyBuilder { return; } + String applicationId = service.getCluster(clusterId).getAppId(); + String clusterAlias = CloudControllerUtil.getAliasFromClusterId(clusterId); Member member = cluster.getMember(memberId); if (member == null) { log.warn(String.format("Member %s does not exist", @@ -795,6 +917,8 @@ public class TopologyBuilder { String clusterInstanceId = member.getClusterInstanceId(); + //member terminated time + Long timestamp = null; try { TopologyManager.acquireWriteLock(); properties = member.getProperties(); @@ -802,12 +926,34 @@ public class TopologyBuilder { TopologyManager.updateTopology(topology); } finally { TopologyManager.releaseWriteLock(); + timestamp = System.currentTimeMillis(); } /* @TODO leftover from grouping_poc*/ String groupAlias = null; TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, clusterInstanceId, networkPartitionId, partitionId, properties, groupAlias); + + //publishing member status to DAS. + MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); + if (memStatusPublisher.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Publishing Member Status to DAS"); + } + memStatusPublisher.publish(timestamp, + applicationId, + member.getClusterId(), + clusterAlias, + member.getClusterInstanceId(), + member.getServiceName(), + member.getNetworkPartitionId(), + member.getPartitionId(), + member.getMemberId(), + MemberStatus.Terminated.toString()); + } else { + log.warn("Member Status Publisher is not enabled"); + } } public static void handleMemberSuspended() { @@ -819,7 +965,8 @@ public class TopologyBuilder { } } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { + public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent + clusterStatusClusterActivatedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); @@ -855,7 +1002,7 @@ public class TopologyBuilder { Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId()); if (kubernetesServices != null) { - + try { // Generate access URLs for kubernetes services for (KubernetesService kubernetesService : kubernetesServices) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java index 37580eb..da23598 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java @@ -31,9 +31,7 @@ import org.apache.stratos.cloud.controller.exception.InvalidPartitionException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.iaases.PartitionValidator; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; -import org.apache.stratos.messaging.domain.topology.MemberStatus; import java.util.Properties; @@ -49,7 +47,7 @@ public class CloudControllerServiceUtil { } /** - * Update the topology, publish statistics to BAM, remove member context + * Update the topology, publish statistics to DAS, remove member context * and persist cloud controller context. * * @param memberContext @@ -66,15 +64,6 @@ public class CloudControllerServiceUtil { memberContext.getClusterId(), memberContext.getNetworkPartitionId(), partitionId, memberContext.getMemberId()); - // Publish statistics to BAM - BAMUsageDataPublisher.publish(memberContext.getMemberId(), - partitionId, - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Terminated.toString(), - null); - // Remove member context CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java index 77cfea2..afd7a23 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java @@ -27,9 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; -import org.apache.stratos.messaging.domain.topology.MemberStatus; - import java.util.concurrent.locks.Lock; /** @@ -84,16 +81,6 @@ public class InstanceCreator implements Runnable { // Update topology TopologyBuilder.handleMemberInitializedEvent(memberContext); - - // Publish instance creation statistics to BAM - BAMUsageDataPublisher.publish( - memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - memberContext.getInstanceMetadata()); } catch (Exception e) { String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s", memberContext.getCartridgeType(), memberContext.getClusterId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java deleted file mode 100644 index 56c5f87..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.statistics.publisher; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.config.CloudControllerConfig; -import org.apache.stratos.cloud.controller.context.CloudControllerContext; -import org.apache.stratos.cloud.controller.domain.Cartridge; -import org.apache.stratos.cloud.controller.domain.InstanceMetadata; -import org.apache.stratos.cloud.controller.domain.MemberContext; -import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.util.CloudControllerConstants; -import org.wso2.carbon.base.ServerConfiguration; -import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; -import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; -import org.wso2.carbon.databridge.commons.Attribute; -import org.wso2.carbon.databridge.commons.AttributeType; -import org.wso2.carbon.databridge.commons.Event; -import org.wso2.carbon.databridge.commons.StreamDefinition; -import org.wso2.carbon.utils.CarbonUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** - * Usage data publisher for publishing instance usage data to BAM. - */ -public class BAMUsageDataPublisher { - - private static final Log log = LogFactory.getLog(BAMUsageDataPublisher.class); - - private static AsyncDataPublisher dataPublisher; - private static StreamDefinition streamDefinition; - private static final String cloudControllerEventStreamVersion = "1.0.0"; - - public static void publish(String memberId, - String partitionId, - String networkId, - String clusterId, - String serviceName, - String status, - InstanceMetadata metadata) { - if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) { - return; - } - log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME + " cycle started."); - - if (dataPublisher == null) { - createDataPublisher(); - - //If we cannot create a data publisher we should give up - //this means data will not be published - if (dataPublisher == null) { - log.error("Data Publisher cannot be created or found."); - release(); - return; - } - } - - MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); - String cartridgeType = memberContext.getCartridgeType(); - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - - //Construct the data to be published - List<Object> payload = new ArrayList<Object>(); - // Payload values - payload.add(memberId); - payload.add(serviceName); - payload.add(clusterId); - payload.add(handleNull(memberContext.getLbClusterId())); - payload.add(handleNull(partitionId)); - payload.add(handleNull(networkId)); - if (cartridge != null) { - payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); - } else { - payload.add(""); - } - payload.add(handleNull(memberContext.getPartition().getProvider())); - payload.add(handleNull(status)); - - if (metadata != null) { - payload.add(metadata.getHostname()); - payload.add(metadata.getHypervisor()); - payload.add(String.valueOf(metadata.getRam())); - payload.add(metadata.getImageId()); - payload.add(metadata.getLoginPort()); - payload.add(metadata.getOperatingSystemName()); - payload.add(metadata.getOperatingSystemVersion()); - payload.add(metadata.getOperatingSystemArchitecture()); - payload.add(String.valueOf(metadata.isOperatingSystem64bit())); - } else { - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(0); - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(""); - } - - payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs()))); - payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs()))); - payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs()))); - - Event event = new Event(); - event.setPayloadData(payload.toArray()); - event.setArbitraryDataMap(new HashMap<String, String>()); - - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); - } - dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); - } catch (AgentException e) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); - } - } - } - - private static void release() { - CloudControllerContext.getInstance().setPublisherRunning(false); - } - - private static StreamDefinition initializeStream() throws Exception { - streamDefinition = new StreamDefinition( - CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM, - cloudControllerEventStreamVersion); - streamDefinition.setNickName("cloud.controller"); - streamDefinition.setDescription("Instances booted up by the Cloud Controller"); - // Payload definition - List<Attribute> payloadData = new ArrayList<Attribute>(); - payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IAAS_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.STATUS_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.RAM_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_LABEL, AttributeType.INT)); - payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_LABEL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_LABEL, AttributeType.STRING)); - streamDefinition.setPayloadData(payloadData); - return streamDefinition; - } - - - private static void createDataPublisher() { - //creating the agent - - ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration(); - String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location"); - String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password"); - String bamServerUrl = serverConfig.getFirstProperty("BamServerURL"); - String adminUsername = CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername(); - String adminPassword = CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword(); - - System.setProperty("javax.net.ssl.trustStore", trustStorePath); - System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword); - - - try { - dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword); - CloudControllerContext.getInstance().setDataPublisher(dataPublisher); - initializeStream(); - dataPublisher.addStreamDefinition(streamDefinition); - } catch (Exception e) { - String msg = "Unable to create a data publisher to " + bamServerUrl + - ". Usage Agent will not function properly. "; - log.error(msg, e); - throw new CloudControllerException(msg, e); - } - } - - private static String handleNull(String val) { - if (val == null) { - return ""; - } - return val; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java new file mode 100644 index 0000000..db68396 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.statistics.publisher; + +import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; + +/** + * Creating MemberInformationPublisher. + */ +public class CloudControllerPublisherFactory { + /** + * Create member information publisher + * + * @param type StatisticsPublisherType + * @return MemberInformationPublisher + */ + public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) { + if (type == StatisticsPublisherType.WSO2DAS) { + return new DASMemberInformationPublisher(); + } else { + throw new RuntimeException("Unknown statistics publisher type"); + } + } + + /** + * Create member status publisher + * + * @param type StatisticsPublisherType + * @return MemberStatusPublisher + */ + public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) { + if (type == StatisticsPublisherType.WSO2DAS) { + return new DASMemberStatusPublisher(); + } else { + throw new RuntimeException("Unknown statistics publisher type"); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java new file mode 100644 index 0000000..2bab194 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.statistics.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.context.CloudControllerContext; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.domain.IaasProvider; +import org.apache.stratos.cloud.controller.domain.InstanceMetadata; +import org.apache.stratos.cloud.controller.domain.MemberContext; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * MemberInfoPublisher to publish member information/metadata to DAS. + */ +public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher { + + private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class); + + private static final String DATA_STREAM_NAME = "member_info"; + private static final String VERSION = "1.0.0"; + private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private static final String NULL_VALUE = "Value Not Found"; + private ExecutorService executorService; + + public DASMemberInformationPublisher() { + super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); + executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10); + } + + private static StreamDefinition createStreamDefinition() { + try { + // Create stream definition + StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); + streamDefinition.setNickName("Member Information"); + streamDefinition.setDescription("Member Information"); + List<Attribute> payloadData = new ArrayList<Attribute>(); + + // Set payload definition + payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_DECISION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.ALLOCATED_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CPU_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT)); + payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.BOOL)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } catch (Exception e) { + throw new RuntimeException("Could not create stream definition", e); + } + } + + /** + * Publishing member info to DAS. + * + * @param memberId Member Id + * @param scalingDecisionId Scaling Decision Id + * @param metadata InstanceMetadata + */ + @Override + public void publish(final String memberId, final String scalingDecisionId, final InstanceMetadata metadata) { + + Runnable publisher = new Runnable() { + @Override + public void run() { + + if (metadata == null) { + return; + } else { + MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); + String cartridgeType = memberContext.getCartridgeType(); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + IaasProvider iaasProvider = CloudControllerContext.getInstance().getIaasProviderOfPartition( + cartridge.getType(), memberContext.getPartition().getId()); + String instanceType = iaasProvider.getProperty(CloudControllerConstants.INSTANCE_TYPE); + + //adding payload data + List<Object> payload = new ArrayList<Object>(); + payload.add(memberId); + payload.add(handleNull(instanceType)); + payload.add(scalingDecisionId); + payload.add(String.valueOf(cartridge.isMultiTenant())); + payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs()))); + payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs()))); + payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs()))); + payload.add(handleNull(metadata.getHostname())); + payload.add(handleNull(metadata.getHypervisor())); + payload.add(handleNull(metadata.getCpu())); + payload.add(handleNull(metadata.getRam())); + payload.add(handleNull(metadata.getImageId())); + payload.add(metadata.getLoginPort()); + payload.add(handleNull(metadata.getOperatingSystemName())); + payload.add(handleNull(metadata.getOperatingSystemVersion())); + payload.add(handleNull(metadata.getOperatingSystemArchitecture())); + payload.add(Boolean.valueOf(metadata.isOperatingSystem64bit())); + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing member information: [member_id] %s [instance_type] %s " + + "[scaling_decison_id] %s [is_multi_tenant] %s [private_IPs] %s " + + "[public_IPs] %s [allocated_IPs] %s [host_name] %s [hypervisor] %s [cpu] %s " + + "[ram] %s [image_id] %s [login_port] %d [os_name] %s " + + "[os_version] %s [os_arch] %s [is_os_64bit] %b", + memberId, instanceType, scalingDecisionId, String.valueOf(cartridge.isMultiTenant()), + memberContext.getPrivateIPs(), memberContext.getPublicIPs(), + memberContext.getAllocatedIPs(), metadata.getHostname(), metadata.getHypervisor(), + metadata.getCpu(), metadata.getRam(), metadata.getImageId(), metadata.getLoginPort(), + metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(), + metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit())); + } + DASMemberInformationPublisher.super.publish(payload.toArray()); + } + } + }; + executorService.execute(publisher); + } + + public static String handleNull(String param) { + if (null != param) { + return param; + } + return NULL_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java new file mode 100644 index 0000000..877256d --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.statistics.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * Publishing member status to DAS. + */ +public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher { + + private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class); + private static final String DATA_STREAM_NAME = "member_lifecycle"; + private static final String VERSION = "1.0.0"; + private static final String DAS_THRIFT_CLIENT_NAME = "das"; + private ExecutorService executorService; + + public DASMemberStatusPublisher() { + super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); + executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10); + } + + private static StreamDefinition createStreamDefinition() { + try { + // Create stream definition + StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); + streamDefinition.setNickName("Member Lifecycle"); + streamDefinition.setDescription("Member Lifecycle"); + List<Attribute> payloadData = new ArrayList<Attribute>(); + + // Set payload definition + payloadData.add(new Attribute(CloudControllerConstants.TIMESTAMP_COL, AttributeType.LONG)); + payloadData.add(new Attribute(CloudControllerConstants.APPLICATION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ALIAS_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SERVICE_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.NETWORK_PARTITION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.MEMBER_STATUS_COL, AttributeType.STRING)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } catch (Exception e) { + throw new RuntimeException("Could not create stream definition", e); + } + } + + /** + * publishing Member Status to DAS. + * + * @param timestamp Status changed time + * @param applicationId Application Id + * @param clusterId Cluster Id + * @param clusterAlias Cluster Alias + * @param clusterInstanceId Cluster Instance Id + * @param networkPartitionId Network Partition Id + * @param partitionId Partition Id + * @param serviceName Service Name + * @param memberId Member Id + * @param status Member Status + * @parm tenantId Tenant Id + */ + @Override + public void publish(final Long timestamp, final String applicationId, final String clusterId, + final String clusterAlias, final String clusterInstanceId, + final String serviceName, final String networkPartitionId, final String partitionId, + final String memberId, final String status) { + + Runnable publisher = new Runnable() { + @Override + public void run() { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing member status: [timestamp] %d application_id] %s " + + "[cluster_id] %s [cluster_alias] %s [cluster_instance_id] %s [service_name] %s " + + "[network_partition_id] %s [partition_id] %s " + + "[member_id] %s [member_status] %s ", + timestamp, applicationId, clusterId, clusterAlias, clusterInstanceId, serviceName, + networkPartitionId, partitionId, memberId, status)); + } + //adding payload data + List<Object> payload = new ArrayList<Object>(); + payload.add(timestamp); + payload.add(applicationId); + payload.add(clusterId); + payload.add(clusterAlias); + payload.add(clusterInstanceId); + payload.add(serviceName); + payload.add(networkPartitionId); + payload.add(partitionId); + payload.add(memberId); + payload.add(status); + DASMemberStatusPublisher.super.publish(payload.toArray()); + } + }; + executorService.execute(publisher); + } + +}
