This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9b9f8bea4 [VL] Support skewness aggregate function (#4939)
9b9f8bea4 is described below
commit 9b9f8bea48f3959769b3045c08f41fb1b1ed8f2f
Author: Joey <[email protected]>
AuthorDate: Thu Mar 14 11:59:32 2024 +0800
[VL] Support skewness aggregate function (#4939)
---
.../io/glutenproject/utils/CHExpressionUtil.scala | 3 +-
.../execution/HashAggregateExecTransformer.scala | 45 ++++++++++++++--------
.../utils/VeloxIntermediateData.scala | 21 +++++++---
.../execution/VeloxAggregateFunctionsSuite.scala | 18 +++++++++
.../substrait/SubstraitToVeloxPlanValidator.cc | 3 +-
docs/velox-backend-support-progress.md | 8 ++--
.../expression/ExpressionMappings.scala | 3 +-
.../glutenproject/expression/ExpressionNames.scala | 1 +
8 files changed, 75 insertions(+), 27 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala
index 41ebad137..31f2cd533 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala
@@ -174,6 +174,7 @@ object CHExpressionUtil {
ENCODE -> EncodeDecodeValidator(),
ARRAY_EXCEPT -> DefaultValidator(),
ARRAY_REPEAT -> DefaultValidator(),
- DATE_FROM_UNIX_DATE -> DefaultValidator()
+ DATE_FROM_UNIX_DATE -> DefaultValidator(),
+ SKEWNESS -> DefaultValidator()
)
}
diff --git
a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala
b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala
index cc7a1a852..565edd9f6 100644
---
a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala
@@ -386,23 +386,38 @@ abstract class HashAggregateExecTransformer(
val adjustedOrders = veloxOrders.map(sparkOrders.indexOf(_))
veloxTypes.zipWithIndex.foreach {
case (veloxType, idx) =>
- val sparkType = sparkTypes(adjustedOrders(idx))
- val attr = rewrittenInputAttributes(adjustedOrders(idx))
- val aggFuncInputAttrNode = ExpressionConverter
- .replaceWithExpressionTransformer(attr,
originalInputAttributes)
- .doTransform(args)
- val expressionNode = if (sparkType != veloxType) {
- newInputAttributes +=
- attr.copy(dataType = veloxType)(attr.exprId,
attr.qualifier)
- ExpressionBuilder.makeCast(
- ConverterUtils.getTypeNode(veloxType, attr.nullable),
- aggFuncInputAttrNode,
- SQLConf.get.ansiEnabled)
+ val adjustedIdx = adjustedOrders(idx)
+ if (adjustedIdx == -1) {
+ // The Velox aggregate intermediate buffer column not
found in Spark.
+ // For example, skewness and kurtosis share the same
aggregate buffer in Velox,
+ // and Kurtosis additionally requires the buffer column of
m4, which is
+ // always 0 for skewness. In Spark, the aggregate buffer
of skewness does not
+ // have the column of m4, thus a placeholder m4 with a
value of 0 must be passed
+ // to Velox, and this value cannot be omitted. Velox will
always read m4 column
+ // when accessing the intermediate data.
+ val extraAttr = AttributeReference(veloxOrders(idx),
veloxType)()
+ newInputAttributes += extraAttr
+ val lt = Literal.default(veloxType)
+ childNodes.add(ExpressionBuilder.makeLiteral(lt.value,
lt.dataType, false))
} else {
- newInputAttributes += attr
- aggFuncInputAttrNode
+ val sparkType = sparkTypes(adjustedIdx)
+ val attr = rewrittenInputAttributes(adjustedIdx)
+ val aggFuncInputAttrNode = ExpressionConverter
+ .replaceWithExpressionTransformer(attr,
originalInputAttributes)
+ .doTransform(args)
+ val expressionNode = if (sparkType != veloxType) {
+ newInputAttributes +=
+ attr.copy(dataType = veloxType)(attr.exprId,
attr.qualifier)
+ ExpressionBuilder.makeCast(
+ ConverterUtils.getTypeNode(veloxType, attr.nullable),
+ aggFuncInputAttrNode,
+ SQLConf.get.ansiEnabled)
+ } else {
+ newInputAttributes += attr
+ aggFuncInputAttrNode
+ }
+ childNodes.add(expressionNode)
}
- childNodes.add(expressionNode)
}
exprNodes.add(getRowConstructNode(args, childNodes,
newInputAttributes, aggFunc))
case other =>
diff --git
a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala
b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala
index faead2ad1..773fedfe9 100644
---
a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala
@@ -27,18 +27,25 @@ import scala.collection.JavaConverters._
object VeloxIntermediateData {
// Agg functions with inconsistent ordering of intermediate data between
Velox and Spark.
// Corr
- val veloxCorrIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xMk",
"yMk", "xAvg", "yAvg")
+ private val veloxCorrIntermediateDataOrder: Seq[String] =
+ Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg")
// CovPopulation, CovSample
- val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg",
"yAvg")
+ private val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n",
"xAvg", "yAvg")
+ // Skewness
+ private val veloxSkewnessIntermediateDataOrder: Seq[String] = Seq("n",
"avg", "m2", "m3", "m4")
// Agg functions with inconsistent types of intermediate data between Velox
and Spark.
// StddevSamp, StddevPop, VarianceSamp, VariancePop
- val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType,
DoubleType, DoubleType)
+ private val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType,
DoubleType, DoubleType)
// CovPopulation, CovSample
- val veloxCovarIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType,
DoubleType, DoubleType)
+ private val veloxCovarIntermediateTypes: Seq[DataType] =
+ Seq(DoubleType, LongType, DoubleType, DoubleType)
// Corr
- val veloxCorrIntermediateTypes: Seq[DataType] =
+ private val veloxCorrIntermediateTypes: Seq[DataType] =
Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType)
+ // Skewness
+ private val veloxSkewnessIntermediateTypes: Seq[DataType] =
+ Seq(LongType, DoubleType, DoubleType, DoubleType, DoubleType)
/**
* Return the intermediate columns order of Velox aggregation functions,
with special matching
@@ -55,6 +62,8 @@ object VeloxIntermediateData {
veloxCorrIntermediateDataOrder
case _: CovPopulation | _: CovSample =>
veloxCovarIntermediateDataOrder
+ case _: Skewness =>
+ veloxSkewnessIntermediateDataOrder
case _ =>
aggFunc.aggBufferAttributes.map(_.name)
}
@@ -134,6 +143,8 @@ object VeloxIntermediateData {
Some(veloxCovarIntermediateTypes)
case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop =>
Some(veloxVarianceIntermediateTypes)
+ case _: Skewness =>
+ Some(veloxSkewnessIntermediateTypes)
case _ if aggFunc.aggBufferAttributes.size > 1 =>
Some(aggFunc.aggBufferAttributes.map(_.dataType))
case _ => None
diff --git
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala
index 48b0a36c0..26bea5b1c 100644
---
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala
@@ -857,6 +857,24 @@ abstract class VeloxAggregateFunctionsSuite extends
VeloxWholeStageTransformerSu
|""".stripMargin)(
df =>
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecTransformer])
== 2))
}
+
+ test("skewness") {
+ runQueryAndCompare("""
+ |select skewness(l_partkey) from lineitem;
+ |""".stripMargin) {
+ checkOperatorMatch[HashAggregateExecTransformer]
+ }
+ runQueryAndCompare("select skewness(l_partkey), count(distinct l_orderkey)
from lineitem") {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[HashAggregateExecTransformer]
+ }) == 4)
+ }
+ }
+ }
}
class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite
{
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 0e81ba91a..3826465b3 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1138,7 +1138,8 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::AggregateRel& ag
"corr",
"covar_pop",
"covar_samp",
- "approx_distinct"};
+ "approx_distinct",
+ "skewness"};
for (const auto& funcSpec : funcSpecs) {
auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec);
diff --git a/docs/velox-backend-support-progress.md
b/docs/velox-backend-support-progress.md
index 4dcd27a34..639211e11 100644
--- a/docs/velox-backend-support-progress.md
+++ b/docs/velox-backend-support-progress.md
@@ -87,7 +87,7 @@ Gluten supports 28 operators (Draw to right to see all data
types)
Gluten supports 199 functions. (Draw to right to see all data types)
| Spark Functions | Velox/Presto Functions | Velox/Spark
functions | Gluten | Restrictions | BOOLEAN | BYTE | SHORT | INT |
LONG | FLOAT | DOUBLE | DATE | TIMESTAMP | STRING | DECIMAL | NULL | BINARS |
CALENDAR | ARRAY | MAP | STRUCT | UDT |
-|-------------------------------|------------------------|-----------------------|--------|------------------------|---------|------|-------|-----|------|-------|--------|------|-----------|--------|---------|------|--------|
-------- |-------| ---- | ------ | ---- |
+|-------------------------------|------------------------|-----------------------|--------|------------------------|---------|------|-------|-----|------|-------|--------|------|-----------|--------|---------|------|--------|
-------- |-------| ---- |--------| ---- |
| ! | | not
| S | | S | S | S | S | S | S
| S | | | S | | | | |
| | | |
| != | neq |
| S | | S | S | S | S | S | S
| S | | | S | | | | |
| | | |
| % | mod | remainder
| S | Ansi Off | | S | S | S | S | S
| | | | | | | | |
| | | |
@@ -372,7 +372,7 @@ Gluten supports 199 functions. (Draw to right to see all
data types)
| mean | avg |
| S | Ansi Off | | | | | |
| | | | | | | | |
| | | |
| min | min |
| S | | | | S | S | S | S
| S | | | | | | | |
| | | |
| min_by | |
| S | | | | | | |
| | | | | | | | |
| | | |
-| skewness | |
| | | | | | | |
| | | | | | | | |
| | | |
+| skewness | skewness | skewness
| S | | | | S | S | S | S
| S | | | | | | | |
| | | |
| some | |
| | | | | | | |
| | | | | | | | |
| | | |
| std,stddev | stddev |
| S | | | | S | S | S | S
| S | | | | | | | |
| | | |
| stddev,std | stddev |
| S | | | | S | S | S | S
| S | | | | | | | |
| | | |
@@ -387,7 +387,7 @@ Gluten supports 199 functions. (Draw to right to see all
data types)
| lag | |
| | | | | | | |
| | | | | | | | |
| | | |
| lead | |
| | | | | | | |
| | | | | | | | |
| | | |
| nth_value | nth_value | nth_value
| PS | | | | | | |
| | | | | | | | |
| | | |
-| ntile | ntile | ntile
| S | | | | | |
| | | | | | | | |
| | | | |
+| ntile | ntile | ntile
| S | | | | | | |
| | | | | | | | |
| | | |
| percent_rank | percent_rank |
| S | | | | | | |
| | | | | | | | |
| | | |
| rank | rank |
| S | | | | | | |
| | | | | | | | |
| | | |
| row_number | row_number |
| S | | | | S | S | S |
| | | | | | | | |
| | | |
@@ -404,7 +404,7 @@ Gluten supports 199 functions. (Draw to right to see all
data types)
| coalesce | |
| PS | | | | | | |
| | | | | | | | |
| | | |
| crc32 | crc32 |
| S | | | | | | |
| | | | S | | | | |
| | | |
| current_user | |
| S* | | | | | | |
| | | | S | | | | |
| | | |
-| current_catalog | |
| S | | | | | | |
| | | | | | | | |
| | | |
+| current_catalog | |
| S | | | | | | |
| | | | | | | | |
| | | |
| current_database | |
| S | | | | | | |
| | | | | | | | |
| | | |
| greatest | greatest | greatest
| S | | | | | | S | S
| S | S | S | | | | | |
| | | |
| hash | hash | hash
| S | | S | S | S | S | S | S
| S | | | | | | | |
| | | |
diff --git
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
index a063c244b..6fb9d80b0 100644
---
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
@@ -269,7 +269,8 @@ object ExpressionMappings {
Sig[CovPopulation](COVAR_POP),
Sig[CovSample](COVAR_SAMP),
Sig[Last](LAST),
- Sig[First](FIRST)
+ Sig[First](FIRST),
+ Sig[Skewness](SKEWNESS)
)
/** Mapping Spark window expression to Substrait function name */
diff --git
a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
index d2fc4f9ec..c5a00b51f 100644
---
a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
@@ -45,6 +45,7 @@ object ExpressionNames {
final val FIRST = "first"
final val FIRST_IGNORE_NULL = "first_ignore_null"
final val APPROX_DISTINCT = "approx_distinct"
+ final val SKEWNESS = "skewness"
// Function names used by Substrait plan.
final val ADD = "add"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]