This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b274a9072 [GLUTEN-4039][VL] Add array filter function support (#5334)
b274a9072 is described below

commit b274a9072795eed3fdb34e3d65cd1007a76ce2b6
Author: Tengfei Huang <[email protected]>
AuthorDate: Wed Apr 10 16:21:10 2024 +0800

    [GLUTEN-4039][VL] Add array filter function support (#5334)
    
    [GLUTEN-4039][VL] Add array filter function support.
---
 .../backendsapi/velox/SparkPlanExecApiImpl.scala     | 16 +++++++++++++++-
 .../execution/ScalarFunctionsValidateSuite.scala     | 20 ++++++++++++++++++++
 cpp/velox/substrait/SubstraitToVeloxExpr.cc          |  2 +-
 docs/velox-backend-support-progress.md               |  2 +-
 .../apache/gluten/backendsapi/SparkPlanExecApi.scala |  9 +++++++++
 .../gluten/expression/ExpressionConverter.scala      |  7 +++++++
 .../gluten/expression/ExpressionMappings.scala       |  1 +
 .../apache/gluten/expression/ExpressionNames.scala   |  1 +
 8 files changed, 55 insertions(+), 3 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
index 82c4bdde8..715a2eeca 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/SparkPlanExecApiImpl.scala
@@ -37,7 +37,7 @@ import 
org.apache.spark.sql.catalyst.{AggregateFunctionRewriteRule, FlushableHas
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
Cast, CreateNamedStruct, ElementAt, Expression, ExpressionInfo, Generator, 
GetArrayItem, GetMapValue, GetStructField, If, IsNaN, Literal, Murmur3Hash, 
NamedExpression, NaNvl, PosExplode, Round, SortOrder, StringSplit, StringTrim, 
Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayFilter, 
Ascending, Attribute, Cast, CreateNamedStruct, ElementAt, Expression, 
ExpressionInfo, Generator, GetArrayItem, GetMapValue, GetStructField, If, 
IsNaN, LambdaFunction, Literal, Murmur3Hash, NamedExpression, NaNvl, 
PosExplode, Round, SortOrder, StringSplit, StringTrim, Uuid}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
HLLAdapter}
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
 import org.apache.spark.sql.catalyst.plans.JoinType
@@ -147,6 +147,20 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
     GenericExpressionTransformer(substraitExprName, Seq(child), expr)
   }
 
+  /** Transform array filter to Substrait. */
+  override def genArrayFilterTransformer(
+      substraitExprName: String,
+      argument: ExpressionTransformer,
+      function: ExpressionTransformer,
+      expr: ArrayFilter): ExpressionTransformer = {
+    expr.function match {
+      case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
+        throw new GlutenNotSupportException(
+          "filter on array with lambda using index argument is not supported 
yet")
+      case _ => GenericExpressionTransformer(substraitExprName, Seq(argument, 
function), expr)
+    }
+  }
+
   /** Transform posexplode to Substrait. */
   override def genPosExplodeTransformer(
       substraitExprName: String,
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index d8e789d1c..9cfe44a8c 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -727,4 +727,24 @@ class ScalarFunctionsValidateSuite extends 
FunctionsValidateTest {
     }
   }
 
+  test("test array filter") {
+    withTempPath {
+      path =>
+        Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), 
Seq.empty, null)
+          .toDF("value")
+          .write
+          .parquet(path.getCanonicalPath)
+
+        
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("array_tbl")
+
+        runQueryAndCompare("select filter(value, x -> x % 2 == 1) as res from 
array_tbl;") {
+          checkGlutenOperatorMatch[ProjectExecTransformer]
+        }
+
+        runQueryAndCompare("select filter(value, x -> x is not null) as res 
from array_tbl;") {
+          checkGlutenOperatorMatch[ProjectExecTransformer]
+        }
+    }
+  }
+
 }
diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc 
b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
index 02d57f276..7d578407c 100644
--- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc
@@ -276,7 +276,7 @@ core::TypedExprPtr 
SubstraitVeloxExprConverter::toLambdaExpr(
         SubstraitParser::findVeloxFunction(functionMap_, 
arg.scalar_function().function_reference());
     CHECK_EQ(veloxFunction, "namedlambdavariable");
     
argumentNames.emplace_back(arg.scalar_function().arguments(0).value().literal().string());
-    
argumentTypes.emplace_back(SubstraitParser::parseType(substraitFunc.output_type()));
+    
argumentTypes.emplace_back(SubstraitParser::parseType(arg.scalar_function().output_type()));
   }
   auto rowType = ROW(std::move(argumentNames), std::move(argumentTypes));
   // Arg[0] -> function.
diff --git a/docs/velox-backend-support-progress.md 
b/docs/velox-backend-support-progress.md
index b9b3275d4..5e81081b7 100644
--- a/docs/velox-backend-support-progress.md
+++ b/docs/velox-backend-support-progress.md
@@ -279,7 +279,7 @@ Gluten supports 199 functions. (Drag to right to see all 
data types)
 | exists                        |                        |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | explode, explode_outer        |                        |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | explode_outer, explode        |                        |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
-| filter                        | filter                 | filter              
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
+| filter                        | filter                 | filter              
  | S      | Lambda with index argument not supported |         |      |       
|     |      |       |        |      |           |        |         |      |    
    |          |       |     |        |     |
 | flatten                       | flatten                |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | map                           | map                    | map                 
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | map_concat                    | map_concat             |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 11f02dede..faa5e9fd5 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -220,6 +220,15 @@ trait SparkPlanExecApi {
     throw new GlutenNotSupportException("map_entries is not supported")
   }
 
+  /** Transform array filter to Substrait. */
+  def genArrayFilterTransformer(
+      substraitExprName: String,
+      argument: ExpressionTransformer,
+      function: ExpressionTransformer,
+      expr: ArrayFilter): ExpressionTransformer = {
+    throw new GlutenNotSupportException("filter(on array) is not supported")
+  }
+
   /** Transform inline to Substrait. */
   def genInlineTransformer(
       substraitExprName: String,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index d3ac729bc..a72be3973 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -576,6 +576,13 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
         e.getTransformer(childrenTransformers)
       case u: Uuid =>
         
BackendsApiManager.getSparkPlanExecApiInstance.genUuidTransformer(substraitExprName,
 u)
+      case f: ArrayFilter =>
+        
BackendsApiManager.getSparkPlanExecApiInstance.genArrayFilterTransformer(
+          substraitExprName,
+          replaceWithExpressionTransformerInternal(f.argument, attributeSeq, 
expressionsMap),
+          replaceWithExpressionTransformerInternal(f.function, attributeSeq, 
expressionsMap),
+          f
+        )
       case expr =>
         GenericExpressionTransformer(
           substraitExprName,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index 79372cf88..82a884379 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -227,6 +227,7 @@ object ExpressionMappings {
     Sig[ArrayExcept](ARRAY_EXCEPT),
     Sig[ArrayRepeat](ARRAY_REPEAT),
     Sig[ArrayRemove](ARRAY_REMOVE),
+    Sig[ArrayFilter](FILTER),
     // Map functions
     Sig[CreateMap](CREATE_MAP),
     Sig[GetMapValue](GET_MAP_VALUE),
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index a6653d461..6365e6c70 100644
--- 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++ 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -242,6 +242,7 @@ object ExpressionNames {
   final val ARRAY_EXCEPT = "array_except"
   final val ARRAY_REPEAT = "array_repeat"
   final val ARRAY_REMOVE = "array_remove"
+  final val FILTER = "filter"
 
   // Map functions
   final val CREATE_MAP = "map"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to