This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b274a9072 [GLUTEN-4039][VL] Add array filter function support (#5334)
b274a9072 is described below
commit b274a9072795eed3fdb34e3d65cd1007a76ce2b6
Author: Tengfei Huang <[email protected]>
AuthorDate: Wed Apr 10 16:21:10 2024 +0800
[GLUTEN-4039][VL] Add array filter function support (#5334)
[GLUTEN-4039][VL] Add array filter function support.
---
.../backendsapi/velox/SparkPlanExecApiImpl.scala | 16 +++++++++++++++-
.../execution/ScalarFunctionsValidateSuite.scala | 20 ++++++++++++++++++++
cpp/velox/substrait/SubstraitToVeloxExpr.cc | 2 +-
docs/velox-backend-support-progress.md | 2 +-
.../apache/gluten/backendsapi/SparkPlanExecApi.scala | 9 +++++++++
.../gluten/expression/ExpressionConverter.scala | 7 +++++++
.../gluten/expression/ExpressionMappings.scala | 1 +
.../apache/gluten/expression/ExpressionNames.scala | 1 +
8 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
index 82c4bdde8..715a2eeca 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FlushableHas
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, Generator,
GetArrayItem, GetMapValue, GetStructField, If, IsNaN, Literal, Murmur3Hash,
NamedExpression, NaNvl, PosExplode, Round, SortOrder, StringSplit, StringTrim,
Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayFilter,
Ascending, Attribute, Cast, CreateNamedStruct, ElementAt, Expression,
ExpressionInfo, Generator, GetArrayItem, GetMapValue, GetStructField, If,
IsNaN, LambdaFunction, Literal, Murmur3Hash, NamedExpression, NaNvl,
PosExplode, Round, SortOrder, StringSplit, StringTrim, Uuid}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
HLLAdapter}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
@@ -147,6 +147,20 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, Seq(child), expr)
}
+ /** Transform array filter to Substrait. */
+ override def genArrayFilterTransformer(
+ substraitExprName: String,
+ argument: ExpressionTransformer,
+ function: ExpressionTransformer,
+ expr: ArrayFilter): ExpressionTransformer = {
+ expr.function match {
+ case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
+ throw new GlutenNotSupportException(
+ "filter on array with lambda using index argument is not supported
yet")
+ case _ => GenericExpressionTransformer(substraitExprName, Seq(argument,
function), expr)
+ }
+ }
+
/** Transform posexplode to Substrait. */
override def genPosExplodeTransformer(
substraitExprName: String,
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index d8e789d1c..9cfe44a8c 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -727,4 +727,24 @@ class ScalarFunctionsValidateSuite extends
FunctionsValidateTest {
}
}
+ test("test array filter") {
+ withTempPath {
+ path =>
+ Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2),
Seq.empty, null)
+ .toDF("value")
+ .write
+ .parquet(path.getCanonicalPath)
+
+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("array_tbl")
+
+ runQueryAndCompare("select filter(value, x -> x % 2 == 1) as res from
array_tbl;") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+
+ runQueryAndCompare("select filter(value, x -> x is not null) as res
from array_tbl;") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+ }
+
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc
b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
index 02d57f276..7d578407c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
@@ -276,7 +276,7 @@ core::TypedExprPtr
SubstraitVeloxExprConverter::toLambdaExpr(
SubstraitParser::findVeloxFunction(functionMap_,
arg.scalar_function().function_reference());
CHECK_EQ(veloxFunction, "namedlambdavariable");
argumentNames.emplace_back(arg.scalar_function().arguments(0).value().literal().string());
-
argumentTypes.emplace_back(SubstraitParser::parseType(substraitFunc.output_type()));
+
argumentTypes.emplace_back(SubstraitParser::parseType(arg.scalar_function().output_type()));
}
auto rowType = ROW(std::move(argumentNames), std::move(argumentTypes));
// Arg[0] -> function.
diff --git a/docs/velox-backend-support-progress.md
b/docs/velox-backend-support-progress.md
index b9b3275d4..5e81081b7 100644
--- a/docs/velox-backend-support-progress.md
+++ b/docs/velox-backend-support-progress.md
@@ -279,7 +279,7 @@ Gluten supports 199 functions. (Drag to right to see all
data types)
| exists | |
| | | | | | | |
| | | | | | | | |
| | | |
| explode, explode_outer | |
| | | | | | | |
| | | | | | | | |
| | | |
| explode_outer, explode | |
| | | | | | | |
| | | | | | | | |
| | | |
-| filter | filter | filter
| | | | | | | |
| | | | | | | | |
| | | |
+| filter | filter | filter
| S | Lambda with index argument not supported | | |
| | | | | | | | | |
| | | | | |
| flatten | flatten |
| | | | | | | |
| | | | | | | | |
| | | |
| map | map | map
| S | | | | | | |
| | | | | | | | |
| | | |
| map_concat | map_concat |
| | | | | | | |
| | | | | | | | |
| | | |
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 11f02dede..faa5e9fd5 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -220,6 +220,15 @@ trait SparkPlanExecApi {
throw new GlutenNotSupportException("map_entries is not supported")
}
+ /** Transform array filter to Substrait. */
+ def genArrayFilterTransformer(
+ substraitExprName: String,
+ argument: ExpressionTransformer,
+ function: ExpressionTransformer,
+ expr: ArrayFilter): ExpressionTransformer = {
+ throw new GlutenNotSupportException("filter(on array) is not supported")
+ }
+
/** Transform inline to Substrait. */
def genInlineTransformer(
substraitExprName: String,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index d3ac729bc..a72be3973 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -576,6 +576,13 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
e.getTransformer(childrenTransformers)
case u: Uuid =>
BackendsApiManager.getSparkPlanExecApiInstance.genUuidTransformer(substraitExprName,
u)
+ case f: ArrayFilter =>
+
BackendsApiManager.getSparkPlanExecApiInstance.genArrayFilterTransformer(
+ substraitExprName,
+ replaceWithExpressionTransformerInternal(f.argument, attributeSeq,
expressionsMap),
+ replaceWithExpressionTransformerInternal(f.function, attributeSeq,
expressionsMap),
+ f
+ )
case expr =>
GenericExpressionTransformer(
substraitExprName,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index 79372cf88..82a884379 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -227,6 +227,7 @@ object ExpressionMappings {
Sig[ArrayExcept](ARRAY_EXCEPT),
Sig[ArrayRepeat](ARRAY_REPEAT),
Sig[ArrayRemove](ARRAY_REMOVE),
+ Sig[ArrayFilter](FILTER),
// Map functions
Sig[CreateMap](CREATE_MAP),
Sig[GetMapValue](GET_MAP_VALUE),
diff --git
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index a6653d461..6365e6c70 100644
---
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -242,6 +242,7 @@ object ExpressionNames {
final val ARRAY_EXCEPT = "array_except"
final val ARRAY_REPEAT = "array_repeat"
final val ARRAY_REMOVE = "array_remove"
+ final val FILTER = "filter"
// Map functions
final val CREATE_MAP = "map"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]