Repository: stratos Updated Branches: refs/heads/tenant-isolation 4fdd431f0 -> 7575faa30
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/sparkscript/CCEvent ---------------------------------------------------------------------- diff --git a/extensions/das/artifacts/sparkscript/CCEvent b/extensions/das/artifacts/sparkscript/CCEvent new file mode 100644 index 0000000..b8bb6fd --- /dev/null +++ b/extensions/das/artifacts/sparkscript/CCEvent @@ -0,0 +1,18 @@ +CREATE TEMPORARY TABLE memberstatus +USING CarbonAnalytics +OPTIONS (tableName "ORG_APACHE_STRATOS_CLOUD_CONTROLLER"); + +CREATE TEMPORARY TABLE memberstatusnew +USING CarbonAnalytics +OPTIONS (tableName "CLUSTER_MEMBER_NEW", + schema "startTime String, endTime String, clusterId STRING, activatedInstanceCount INT, terminatedInstanceCount INT, activeInstanceCount INT"); + +;WITH InstanceCount as +(select clusterId, count(case when status='Active' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as activatedInstanceCount, count(case when status='Terminated' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as terminatedInstanceCount, (sum(case when status='Active' then 1 else 0 end) - sum(case when status='Terminated' then 1 else 0 end))as activeInstanceCount from memberstatus group by clusterId) +INSERT INTO table memberstatusnew select time(current_time(null)-60000),time(current_time(null)),clusterId, activatedInstanceCount, terminatedInstanceCount,activeInstanceCount from InstanceCount; + +CREATE TEMPORARY TABLE membersnew +USING CarbonAnalytics +OPTIONS (tableName "MEMBER_NEW",schema "clusterId STRING, clusterInstanceId STRING, networkId STRING, partitionId STRING, cartridgeType STRING, instanceType STRING, memberId STRING, scalingTime LONG,scalingReason STRING, timeStamp STRING"); + +INSERT INTO TABLE membersnew select clusterId,clusterInstanceId,networkId,partitionId,cartridgeType,instanceType, memberId,scalingTime,scalingReason,time(timeStamp)as timeStamp FROM memberstatus where status='Created'; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/das/pom.xml b/extensions/das/pom.xml new file mode 100644 index 0000000..d21d1be --- /dev/null +++ b/extensions/das/pom.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>stratos-extensions</artifactId> + <groupId>org.apache.stratos</groupId> + <version>4.1.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.stratos</groupId> + <artifactId>strats-das-extension</artifactId> + <packaging>pom</packaging> + <name>Apache Stratos - DAS Extension</name> + <description>Apache Stratos extensions for DAS.</description> + <modules> + <module>spark-udf</module> + </modules> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/das/spark-udf/pom.xml b/extensions/das/spark-udf/pom.xml new file mode 100644 index 0000000..ced0f0a --- /dev/null +++ b/extensions/das/spark-udf/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>strats-das-extension</artifactId> + <groupId>org.apache.stratos</groupId> + <version>4.1.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.stratos</groupId> + <artifactId>apache-stratos-spark-udf</artifactId> + <name>Apache Stratos - Spark UDF</name> + <description>Apache Stratos Spark UDF for DAS</description> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java ---------------------------------------------------------------------- diff --git a/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java new file mode 100644 index 0000000..0b8f408 --- /dev/null +++ b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.das.extension.spark.udf; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Implementing UDF for implementing spark sql query related to time. + */ +public class TimeUDF { + /** + * Convert time(ms) to DateFormat + * + * @param timeStamp time in ms + * @return date as String + */ + public String time(Long timeStamp) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + Date date = new Date(timeStamp.longValue()); + return sdf.format(date); + } + + /** + * Get the current time in ms + * + * @param param + * @return + */ + public long current_time(Integer param) { + return System.currentTimeMillis(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/pom.xml b/extensions/pom.xml index ffbfa8e..1cc9038 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -17,7 +17,8 @@ ~ specific language governing permissions and limitations ~ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <parent> <groupId>org.apache.stratos</groupId> @@ -36,6 +37,7 @@ <module>cep/stratos-cep-extension</module> <module>cep/distribution/</module> <module>load-balancer</module> + <module>das</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl index 56e9164..a8102da 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl @@ -84,7 +84,9 @@ dialect "mvel" log.info("[dependency-scale] [scale-up] Partition available, hence trying to spawn an instance to scale up!" ); log.debug("[dependency-scale] [scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId ); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + String scalingReason = "Dependency scaling"; + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime); count++; } else { partitionsAvailable = false; http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl index 96b60da..4eaab2b 100755 --- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl @@ -84,7 +84,10 @@ dialect "mvel" log.info("[min-check] Partition available, hence trying to spawn an instance to fulfil minimum count!" + " [cluster] " + clusterId); log.debug("[min-check] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + String scalingReason = "Scaling up to fulfil minimum count, [Cluster Min Members] "+clusterInstanceContext.getMinInstanceCount()+" [Additional instances to be created] " + additionalInstances; + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime); + count++; } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl index e6f8f67..3b4a916 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl @@ -164,6 +164,10 @@ dialect "mvel" boolean partitionsAvailable = true; int count = 0; + String autoscalingReason = (numberOfRequiredInstances == numberOfInstancesReuquiredBasedOnRif)?"Scaling up due to RIF, [Predicted Value] "+rifPredictedValue+" [Threshold] "+rifThreshold:(numberOfRequiredInstances== numberOfInstancesReuquiredBasedOnMemoryConsumption)?"Scaling up due to MC, [Predicted Value] "+mcPredictedValue+" [Threshold] "+mcThreshold:"Scaling up due to LA, [Predicted Value] "+laPredictedValue+" [Threshold] "+laThreshold; + autoscalingReason += " [Number of required instances] "+numberOfRequiredInstances+" [Cluster Max Members] "+clusterMaxMembers+" [Additional instances to be created] " + additionalInstances; + + while(count != additionalInstances && partitionsAvailable){ ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext) partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray()); @@ -182,7 +186,8 @@ dialect "mvel" " [laPredictedValue] " + laPredictedValue + " [laThreshold] " + laThreshold); log.debug("[scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId ); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,autoscalingReason,scalingTime); count++; } else {
