This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 800cadd0f [VL] Support Spark transform_keys, transform_values function
(#6095)
800cadd0f is described below
commit 800cadd0f4f71d0ebedb5fbf6428442ae52b77ac
Author: 高阳阳 <[email protected]>
AuthorDate: Tue Jun 18 21:16:52 2024 +0800
[VL] Support Spark transform_keys, transform_values function (#6095)
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 5 ++-
.../execution/ScalarFunctionsValidateSuite.scala | 40 ++++++++++++++++++++++
.../gluten/expression/ExpressionConverter.scala | 15 ++++++++
.../apache/gluten/expression/ExpressionNames.scala | 2 ++
4 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 26b4c5082..ebf82ea76 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
+import org.apache.gluten.expression.ExpressionNames.{TRANSFORM_KEYS,
TRANSFORM_VALUES}
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar.TransformHints
@@ -854,7 +855,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
Sig[VeloxCollectList](ExpressionNames.COLLECT_LIST),
Sig[VeloxCollectSet](ExpressionNames.COLLECT_SET),
Sig[VeloxBloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
- Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG)
+ Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG),
+ Sig[TransformKeys](TRANSFORM_KEYS),
+ Sig[TransformValues](TRANSFORM_VALUES)
)
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index a23fdf243..9718b8e73 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -537,6 +537,46 @@ class ScalarFunctionsValidateSuite extends
FunctionsValidateTest {
}
}
+ test("test transform_keys function") {
+ withTempPath {
+ path =>
+ Seq(
+ Map[String, Int]("a" -> 1, "b" -> 2),
+ Map[String, Int]("a" -> 2, "b" -> 3),
+ null
+ )
+ .toDF("m")
+ .write
+ .parquet(path.getCanonicalPath)
+
+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("map_tbl")
+
+ runQueryAndCompare("select transform_keys(m, (k, v) -> upper(k)) from
map_tbl") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+ }
+
+ test("test transform_values function") {
+ withTempPath {
+ path =>
+ Seq(
+ Map[String, Int]("a" -> 1, "b" -> 2),
+ Map[String, Int]("a" -> 2, "b" -> 3),
+ null
+ )
+ .toDF("m")
+ .write
+ .parquet(path.getCanonicalPath)
+
+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("map_tbl")
+
+ runQueryAndCompare("select transform_values(m, (k, v) -> v + 1) from
map_tbl") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+ }
+
test("zip_with") {
withTempPath {
path =>
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 464bbbfd0..b7b0889dc 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -656,6 +656,21 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
Seq(replaceWithExpressionTransformerInternal(c.child, attributeSeq,
expressionsMap)),
c
)
+ case t: TransformKeys =>
+ // default is `EXCEPTION`
+ val mapKeyDedupPolicy =
SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)
+ if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
+ // TODO: Remove after fix ready for
+ // https://github.com/facebookincubator/velox/issues/10219
+ throw new GlutenNotSupportException(
+ "LAST_WIN policy is not supported yet in native to deduplicate map
keys"
+ )
+ }
+ GenericExpressionTransformer(
+ substraitExprName,
+ t.children.map(replaceWithExpressionTransformerInternal(_,
attributeSeq, expressionsMap)),
+ t
+ )
case expr =>
GenericExpressionTransformer(
substraitExprName,
diff --git
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index 20db38018..112fa677d 100644
---
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -276,6 +276,8 @@ object ExpressionNames {
final val MAP_FROM_ARRAYS = "map_from_arrays"
final val MAP_ENTRIES = "map_entries"
final val MAP_ZIP_WITH = "map_zip_with"
+ final val TRANSFORM_KEYS = "transform_keys"
+ final val TRANSFORM_VALUES = "transform_values"
final val STR_TO_MAP = "str_to_map"
// struct functions
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]