This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 41ca811b3 [CORE] Introduce aggregateExpressionMappings interface in
Spark shims (#5154)
41ca811b3 is described below
commit 41ca811b3aeff59532b2c0730cd0448f27577e4e
Author: Joey <[email protected]>
AuthorDate: Wed Mar 27 20:11:22 2024 +0800
[CORE] Introduce aggregateExpressionMappings interface in Spark shims
(#5154)
---
.../main/scala/io/glutenproject/expression/ExpressionMappings.scala | 6 +++---
.../src/main/scala/io/glutenproject/sql/shims/SparkShims.scala | 4 +++-
.../scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala | 4 +++-
.../scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala | 4 +++-
.../scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala | 5 +++--
.../scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala | 5 +++--
6 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
index a286ed556..180597ebf 100644
---
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
@@ -248,9 +248,9 @@ object ExpressionMappings {
Sig[SparkPartitionID](SPARK_PARTITION_ID),
// Decimal
Sig[UnscaledValue](UNSCALED_VALUE)
- ) ++ SparkShimLoader.getSparkShims.expressionMappings
+ ) ++ SparkShimLoader.getSparkShims.scalarExpressionMappings
- /** Mapping Spark aggregation expression to Substrait function name */
+ /** Mapping Spark aggregate expression to Substrait function name */
private val AGGREGATE_SIGS: Seq[Sig] = Seq(
Sig[Sum](SUM),
Sig[Average](AVG),
@@ -275,7 +275,7 @@ object ExpressionMappings {
Sig[Last](LAST),
Sig[First](FIRST),
Sig[Skewness](SKEWNESS)
- )
+ ) ++ SparkShimLoader.getSparkShims.aggregateExpressionMappings
/** Mapping Spark window expression to Substrait function name */
private val WINDOW_SIGS: Seq[Sig] = Seq(
diff --git
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index e1fcb7b91..df1f3e451 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -61,7 +61,9 @@ trait SparkShims {
// https://github.com/apache/spark/pull/32875
def getDistribution(leftKeys: Seq[Expression], rightKeys: Seq[Expression]):
Seq[Distribution]
- def expressionMappings: Seq[Sig]
+ def scalarExpressionMappings: Seq[Sig]
+
+ def aggregateExpressionMappings: Seq[Sig]
def convertPartitionTransforms(partitions: Seq[Transform]): (Seq[String],
Option[BucketSpec])
diff --git
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
index 8f028968e..5731ff957 100644
---
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
@@ -56,7 +56,9 @@ class Spark32Shims extends SparkShims {
HashClusteredDistribution(leftKeys) ::
HashClusteredDistribution(rightKeys) :: Nil
}
- override def expressionMappings: Seq[Sig] =
Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
+ override def scalarExpressionMappings: Seq[Sig] =
Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
+
+ override def aggregateExpressionMappings: Seq[Sig] = Seq.empty
override def convertPartitionTransforms(
partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
diff --git
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
index c0a5f2e68..54799f498 100644
---
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
@@ -61,7 +61,7 @@ class Spark33Shims extends SparkShims {
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
}
- override def expressionMappings: Seq[Sig] = {
+ override def scalarExpressionMappings: Seq[Sig] = {
val list = if (GlutenConfig.getConf.enableNativeBloomFilter) {
Seq(
Sig[BloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
@@ -76,6 +76,8 @@ class Spark33Shims extends SparkShims {
)
}
+ override def aggregateExpressionMappings: Seq[Sig] = Seq.empty
+
override def convertPartitionTransforms(
partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
CatalogUtil.convertPartitionTransforms(partitions)
diff --git
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index 13efff291..027e88cbc 100644
---
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -62,7 +62,7 @@ class Spark34Shims extends SparkShims {
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
}
- override def expressionMappings: Seq[Sig] = {
+ override def scalarExpressionMappings: Seq[Sig] = {
val list = if (GlutenConfig.getConf.enableNativeBloomFilter) {
Seq(
Sig[BloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
@@ -73,9 +73,10 @@ class Spark34Shims extends SparkShims {
Sig[Sec](ExpressionNames.SEC),
Sig[Csc](ExpressionNames.CSC),
Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
-
}
+ override def aggregateExpressionMappings: Seq[Sig] = Seq.empty
+
override def convertPartitionTransforms(
partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
CatalogUtil.convertPartitionTransforms(partitions)
diff --git
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index 8bea501c0..93dce4fdc 100644
---
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -61,7 +61,7 @@ class Spark35Shims extends SparkShims {
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
}
- override def expressionMappings: Seq[Sig] = {
+ override def scalarExpressionMappings: Seq[Sig] = {
val list = if (GlutenConfig.getConf.enableNativeBloomFilter) {
Seq(
Sig[BloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
@@ -72,9 +72,10 @@ class Spark35Shims extends SparkShims {
Sig[Sec](ExpressionNames.SEC),
Sig[Csc](ExpressionNames.CSC),
Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
-
}
+ override def aggregateExpressionMappings: Seq[Sig] = Seq.empty
+
override def convertPartitionTransforms(
partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
CatalogUtil.convertPartitionTransforms(partitions)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]