This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c987bd20a [GLUTEN-4039][VL] Implement stack function (#5813)
c987bd20a is described below

commit c987bd20a3e48fb62370aedee2798f6700bf9775
Author: James Xu <[email protected]>
AuthorDate: Wed May 22 09:59:45 2024 +0800

    [GLUTEN-4039][VL] Implement stack function (#5813)
    
    We insert a ProjectExec before GenerateExec to organize stack's
    params as several arrays, these arrays then would be unnested
    using Unnest operator, for query:
    
    select stack(2, id, name, id1, name1)
    
    The plan is:
    
    Generate stack(2, id#122, name#123, id1#124, name1#125), false, [col0#137, 
col1#138]
    +- Project [id#122, name#123, id1#124, name1#125, array(id#122, id1#124) AS 
_pre_0#141, array(name#123,name1#125) AS _pre_1#142]
      +- RewrittenNodeWall LocalTableScan [id#122, name#123, id1#124, name1#125]
---
 .../org/apache/gluten/utils/CHExpressionUtil.scala |  3 +-
 .../gluten/execution/GenerateExecTransformer.scala | 46 +++++++++++++++-
 .../org/apache/gluten/execution/TestOperator.scala | 37 +++++++++++++
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 64 +++++++++++++++++-----
 docs/velox-backend-support-progress.md             |  4 +-
 .../gluten/expression/ExpressionMappings.scala     |  4 +-
 .../apache/gluten/expression/ExpressionNames.scala |  1 +
 7 files changed, 138 insertions(+), 21 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
index 8454b1469..5f78d25cc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
@@ -200,6 +200,7 @@ object CHExpressionUtil {
     TIMESTAMP_MILLIS -> DefaultValidator(),
     TIMESTAMP_MICROS -> DefaultValidator(),
     FLATTEN -> DefaultValidator(),
-    RINT -> DefaultValidator()
+    RINT -> DefaultValidator(),
+    STACK -> DefaultValidator()
   )
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 8f5782742..23addb89e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -109,6 +109,18 @@ case class GenerateExecTransformer(
         .append("isPosExplode=")
         .append(isPosExplode)
         .append("\n")
+
+      // isStack: 1 for Stack, 0 for others.
+      val isStack = if (generator.isInstanceOf[Stack]) {
+        "1"
+      } else {
+        "0"
+      }
+      parametersStr
+        .append("isStack=")
+        .append(isStack)
+        .append("\n")
+
       val message = StringValue
         .newBuilder()
         .setValue(parametersStr.toString)
@@ -128,7 +140,7 @@ object GenerateExecTransformer {
       false
     } else {
       generator match {
-        case _: Inline | _: ExplodeBase | _: JsonTuple =>
+        case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack =>
           true
         case _ =>
           false
@@ -159,7 +171,7 @@ object PullOutGenerateProjectHelper extends 
PullOutProjectHelper {
           }
           val newGeneratorChildren = Seq(newGeneratorChild)
 
-          // Avoid using elimainateProjectList to create the project list
+          // Avoid using eliminateProjectList to create the project list
           // because newGeneratorChild can be a duplicated Attribute in 
generate.child.output.
           // The native side identifies the last field of projection as 
generator's input.
           generate.copy(
@@ -167,6 +179,36 @@ object PullOutGenerateProjectHelper extends 
PullOutProjectHelper {
               
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
             child = ProjectExec(generate.child.output ++ newGeneratorChildren, 
generate.child)
           )
+        case stack: Stack =>
+          val numRows = stack.children.head.eval().asInstanceOf[Int]
+          val numFields = Math.ceil((stack.children.size - 1.0) / 
numRows).toInt
+
+          val newProjections = mutable.Buffer[NamedExpression]()
+          val args = stack.children.tail
+
+          // We organize stack's params as `numFields` arrays which will be 
feed
+          // to Unnest operator on native side.
+          for (field <- 0 until numFields) {
+            val fieldArray = mutable.Buffer[Expression]()
+
+            for (row <- 0 until numRows) {
+              val index = row * numFields + field
+              if (index < args.size) {
+                fieldArray += args(index)
+              } else {
+                // Append nulls.
+                fieldArray += Literal(null, args(field).dataType)
+              }
+            }
+
+            newProjections += Alias(CreateArray(fieldArray), 
generatePreAliasName)()
+          }
+
+          // Plug in a Project between Generate and its child.
+          generate.copy(
+            generator = generate.generator,
+            child = ProjectExec(generate.child.output ++ newProjections, 
generate.child)
+          )
         case JsonTuple(Seq(jsonObj, jsonPaths @ _*)) =>
           val getJsons: IndexedSeq[Expression] = {
             jsonPaths.map {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 0872ac798..287bf1e9b 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -803,6 +803,43 @@ class TestOperator extends VeloxWholeStageTransformerSuite 
{
     }
   }
 
+  test("test stack function") {
+    withTempView("t1") {
+      sql("""SELECT * from values
+            |  (1, "james", 10, "lucy"),
+            |  (2, "bond", 20, "lily")
+            |as tbl(id, name, id1, name1)
+         """.stripMargin).createOrReplaceTempView("t1")
+
+      // Stack function with attributes as params.
+      // Stack 4 attributes, no nulls need to be padded.
+      runQueryAndCompare(s"""
+                            |SELECT stack(2, id, name, id1, name1) from t1;
+                            |""".stripMargin) {
+        checkGlutenOperatorMatch[GenerateExecTransformer]
+      }
+
+      // Stack 3 attributes: there will be nulls.
+      runQueryAndCompare(s"""
+                            |SELECT stack(2, id, name, id1) from t1;
+                            |""".stripMargin) {
+        checkGlutenOperatorMatch[GenerateExecTransformer]
+      }
+
+      // Stack function with literals as params.
+      runQueryAndCompare("SELECT stack(2, 1, 2, 3);") {
+        checkGlutenOperatorMatch[GenerateExecTransformer]
+      }
+
+      // Stack function with params mixed with attributes and literals.
+      runQueryAndCompare(s"""
+                            |SELECT stack(2, id, name, 1) from t1;
+                            |""".stripMargin) {
+        checkGlutenOperatorMatch[GenerateExecTransformer]
+      }
+    }
+  }
+
   test("test inline function") {
     // Literal: func(literal)
     runQueryAndCompare(s"""
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34ba6057c..c07826f5a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -706,6 +706,23 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   return std::make_shared<core::ExpandNode>(nextPlanNodeId(), projectSetExprs, 
std::move(names), childNode);
 }
 
+namespace {
+
+void extractUnnestFieldExpr(
+    std::shared_ptr<const core::ProjectNode> projNode,
+    int32_t index,
+    std::vector<core::FieldAccessTypedExprPtr>& unnestFields) {
+  auto name = projNode->names()[index];
+  auto expr = projNode->projections()[index];
+  auto type = expr->type();
+
+  auto unnestFieldExpr = std::make_shared<core::FieldAccessTypedExpr>(type, 
name);
+  VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only 
support field");
+  unnestFields.emplace_back(unnestFieldExpr);
+}
+
+} // namespace
+
 core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const 
::substrait::GenerateRel& generateRel) {
   core::PlanNodePtr childNode;
   if (generateRel.has_input()) {
@@ -732,22 +749,39 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
 
   auto projNode = std::dynamic_pointer_cast<const 
core::ProjectNode>(childNode);
 
+  bool isStack = generateRel.has_advanced_extension() &&
+      
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), 
"isStack=");
+
   if (projNode != nullptr && projNode->names().size() > 
requiredChildOutput.size()) {
-    // Generator function's input is not a field reference, e.g. 
explode(array(1,2,3)), a sample
-    // input substrait plan is like the following(the plan structure is 
ensured by scala code):
-    //
-    //  Generate explode([1,2,3] AS _pre_0#129), false, [col#126]
-    //  +- Project [fake_column#128, [1,2,3] AS _pre_0#129]
-    //   +- RewrittenNodeWall Scan OneRowRelation[fake_column#128]
-    //
-    // The last projection column in GeneratorRel's child(Project) is the 
column we need to unnest
-    auto innerName = projNode->names().back();
-    auto innerExpr = projNode->projections().back();
-
-    auto innerType = innerExpr->type();
-    auto unnestFieldExpr = 
std::make_shared<core::FieldAccessTypedExpr>(innerType, innerName);
-    VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only 
support field");
-    unnest.emplace_back(unnestFieldExpr);
+    // Generator function's input is NOT a field reference.
+    if (!isStack) {
+      // For generator function which is not stack, e.g. 
explode(array(1,2,3)), a sample
+      // input substrait plan is like the following:
+      //
+      //  Generate explode([1,2,3] AS _pre_0#129), false, [col#126]
+      //  +- Project [fake_column#128, [1,2,3] AS _pre_0#129]
+      //   +- RewrittenNodeWall Scan OneRowRelation[fake_column#128]
+      // The last projection column in GeneratorRel's child(Project) is the 
column we need to unnest
+      extractUnnestFieldExpr(projNode, projNode->projections().size() - 1, 
unnest);
+    } else {
+      // For stack function, e.g. stack(2, 1,2,3), a sample
+      // input substrait plan is like the following:
+      //
+      // Generate stack(2, id#122, name#123, id1#124, name1#125), false, 
[col0#137, col1#138]
+      // +- Project [id#122, name#123, id1#124, name1#125, array(id#122, 
id1#124) AS _pre_0#141, array(name#123,
+      // name1#125) AS _pre_1#142]
+      //   +- RewrittenNodeWall LocalTableScan [id#122, name#123, id1#124, 
name1#125]
+      //
+      // The last `numFields` projections are the fields we want to unnest.
+      auto generatorFunc = generator.scalar_function();
+      auto numRows = 
SubstraitParser::getLiteralValue<int32_t>(generatorFunc.arguments(0).value().literal());
+      auto numFields = 
static_cast<int32_t>(std::ceil((generatorFunc.arguments_size() - 1.0) / 
numRows));
+      auto totalProjectCount = projNode->names().size();
+
+      for (auto i = totalProjectCount - numFields; i < totalProjectCount; ++i) 
{
+        extractUnnestFieldExpr(projNode, i, unnest);
+      }
+    }
   } else {
     // Generator function's input is a field reference, e.g. explode(col), 
generator
     // function's first argument is the field reference we need to unnest.
diff --git a/docs/velox-backend-support-progress.md 
b/docs/velox-backend-support-progress.md
index b8acce266..fb68740c7 100644
--- a/docs/velox-backend-support-progress.md
+++ b/docs/velox-backend-support-progress.md
@@ -273,7 +273,7 @@ Gluten supports 199 functions. (Drag to right to see all 
data types)
 | array_sort                    | array_sort             | array_sort          
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | array_union                   |                        |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | arrays_overlap                | array_overlap          |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
-| arrays_zip                    | zip                       |                  
     |  S      |                          |         |      |       |     |      
|       |        |      |           |        |         |      |        |        
  |       |     |        |     |
+| arrays_zip                    | zip                       |                  
     | S      |                          |         |      |       |     |      
|       |        |      |           |        |         |      |        |        
  |       |     |        |     |
 | cardinality                   | cardinality            |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | element_at                    | element_at             | element_at          
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
S     | S   |        |     |
 | exists                        | any_match              |                     
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
@@ -446,6 +446,6 @@ Gluten supports 199 functions. (Drag to right to see all 
data types)
 | sha1                          | sha1                   | sha1                
  | S      |                          |         |      |       |     |      |   
    |        |      |           | S      |         |      |        |          | 
      |     |        |     |
 | sha2                          |                        | sha2                
  | S      |                          |         |      |       |     |      |   
    |        |      |           | S      |         |      |        |          | 
      |     |        |     |
 | spark_partition_id            |                        |                     
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
-| stack                         |                        |                     
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
+| stack                         |                        |                     
  | S      |                          | S       | S    | S     | S   | S    | S 
    | S      | S    | S         | S      | S       | S    | S      | S        | 
S     | S   | S      | S   |
 | xxhash64                      | xxhash64               | xxhash64            
  |        |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
 | uuid                          | uuid                   | uuid                
  | S      |                          |         |      |       |     |      |   
    |        |      |           |        |         |      |        |          | 
      |     |        |     |
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index f910c8a98..c734967de 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -276,7 +276,9 @@ object ExpressionMappings {
     Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID),
     Sig[SparkPartitionID](SPARK_PARTITION_ID),
     // Decimal
-    Sig[UnscaledValue](UNSCALED_VALUE)
+    Sig[UnscaledValue](UNSCALED_VALUE),
+    // Generator function
+    Sig[Stack](STACK)
   ) ++ SparkShimLoader.getSparkShims.scalarExpressionMappings
 
   /** Mapping Spark aggregate expression to Substrait function name */
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index 1b73b2686..eded85e06 100644
--- 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++ 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -294,6 +294,7 @@ object ExpressionNames {
   final val AGGREGATE = "aggregate"
   final val LAMBDAFUNCTION = "lambdafunction"
   final val EXPLODE = "explode"
+  final val STACK = "stack"
   final val INLINE = "inline"
   final val POSEXPLODE = "posexplode"
   final val CHECK_OVERFLOW = "check_overflow"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to