This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c987bd20a [GLUTEN-4039][VL] Implement stack function (#5813)
c987bd20a is described below
commit c987bd20a3e48fb62370aedee2798f6700bf9775
Author: James Xu <[email protected]>
AuthorDate: Wed May 22 09:59:45 2024 +0800
[GLUTEN-4039][VL] Implement stack function (#5813)
We insert a ProjectExec before GenerateExec to organize stack's
params as several arrays, these arrays then would be unnested
using Unnest operator, for query:
select stack(2, id, name, id1, name1)
The plan is:
Generate stack(2, id#122, name#123, id1#124, name1#125), false, [col0#137,
col1#138]
+- Project [id#122, name#123, id1#124, name1#125, array(id#122, id1#124) AS
_pre_0#141, array(name#123,name1#125) AS _pre_1#142]
+- RewrittenNodeWall LocalTableScan [id#122, name#123, id1#124, name1#125]
---
.../org/apache/gluten/utils/CHExpressionUtil.scala | 3 +-
.../gluten/execution/GenerateExecTransformer.scala | 46 +++++++++++++++-
.../org/apache/gluten/execution/TestOperator.scala | 37 +++++++++++++
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 64 +++++++++++++++++-----
docs/velox-backend-support-progress.md | 4 +-
.../gluten/expression/ExpressionMappings.scala | 4 +-
.../apache/gluten/expression/ExpressionNames.scala | 1 +
7 files changed, 138 insertions(+), 21 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
index 8454b1469..5f78d25cc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala
@@ -200,6 +200,7 @@ object CHExpressionUtil {
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
FLATTEN -> DefaultValidator(),
- RINT -> DefaultValidator()
+ RINT -> DefaultValidator(),
+ STACK -> DefaultValidator()
)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 8f5782742..23addb89e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -109,6 +109,18 @@ case class GenerateExecTransformer(
.append("isPosExplode=")
.append(isPosExplode)
.append("\n")
+
+ // isStack: 1 for Stack, 0 for others.
+ val isStack = if (generator.isInstanceOf[Stack]) {
+ "1"
+ } else {
+ "0"
+ }
+ parametersStr
+ .append("isStack=")
+ .append(isStack)
+ .append("\n")
+
val message = StringValue
.newBuilder()
.setValue(parametersStr.toString)
@@ -128,7 +140,7 @@ object GenerateExecTransformer {
false
} else {
generator match {
- case _: Inline | _: ExplodeBase | _: JsonTuple =>
+ case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack =>
true
case _ =>
false
@@ -159,7 +171,7 @@ object PullOutGenerateProjectHelper extends
PullOutProjectHelper {
}
val newGeneratorChildren = Seq(newGeneratorChild)
- // Avoid using elimainateProjectList to create the project list
+ // Avoid using eliminateProjectList to create the project list
// because newGeneratorChild can be a duplicated Attribute in
generate.child.output.
// The native side identifies the last field of projection as
generator's input.
generate.copy(
@@ -167,6 +179,36 @@ object PullOutGenerateProjectHelper extends
PullOutProjectHelper {
generate.generator.withNewChildren(newGeneratorChildren).asInstanceOf[Generator],
child = ProjectExec(generate.child.output ++ newGeneratorChildren,
generate.child)
)
+ case stack: Stack =>
+ val numRows = stack.children.head.eval().asInstanceOf[Int]
+ val numFields = Math.ceil((stack.children.size - 1.0) /
numRows).toInt
+
+ val newProjections = mutable.Buffer[NamedExpression]()
+ val args = stack.children.tail
+
+ // We organize stack's params as `numFields` arrays which will be
feed
+ // to Unnest operator on native side.
+ for (field <- 0 until numFields) {
+ val fieldArray = mutable.Buffer[Expression]()
+
+ for (row <- 0 until numRows) {
+ val index = row * numFields + field
+ if (index < args.size) {
+ fieldArray += args(index)
+ } else {
+ // Append nulls.
+ fieldArray += Literal(null, args(field).dataType)
+ }
+ }
+
+ newProjections += Alias(CreateArray(fieldArray),
generatePreAliasName)()
+ }
+
+ // Plug in a Project between Generate and its child.
+ generate.copy(
+ generator = generate.generator,
+ child = ProjectExec(generate.child.output ++ newProjections,
generate.child)
+ )
case JsonTuple(Seq(jsonObj, jsonPaths @ _*)) =>
val getJsons: IndexedSeq[Expression] = {
jsonPaths.map {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 0872ac798..287bf1e9b 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -803,6 +803,43 @@ class TestOperator extends VeloxWholeStageTransformerSuite
{
}
}
+ test("test stack function") {
+ withTempView("t1") {
+ sql("""SELECT * from values
+ | (1, "james", 10, "lucy"),
+ | (2, "bond", 20, "lily")
+ |as tbl(id, name, id1, name1)
+ """.stripMargin).createOrReplaceTempView("t1")
+
+ // Stack function with attributes as params.
+ // Stack 4 attributes, no nulls need to be padded.
+ runQueryAndCompare(s"""
+ |SELECT stack(2, id, name, id1, name1) from t1;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+
+ // Stack 3 attributes: there will be nulls.
+ runQueryAndCompare(s"""
+ |SELECT stack(2, id, name, id1) from t1;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+
+ // Stack function with literals as params.
+ runQueryAndCompare("SELECT stack(2, 1, 2, 3);") {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+
+ // Stack function with params mixed with attributes and literals.
+ runQueryAndCompare(s"""
+ |SELECT stack(2, id, name, 1) from t1;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[GenerateExecTransformer]
+ }
+ }
+ }
+
test("test inline function") {
// Literal: func(literal)
runQueryAndCompare(s"""
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34ba6057c..c07826f5a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -706,6 +706,23 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
return std::make_shared<core::ExpandNode>(nextPlanNodeId(), projectSetExprs,
std::move(names), childNode);
}
+namespace {
+
+void extractUnnestFieldExpr(
+ std::shared_ptr<const core::ProjectNode> projNode,
+ int32_t index,
+ std::vector<core::FieldAccessTypedExprPtr>& unnestFields) {
+ auto name = projNode->names()[index];
+ auto expr = projNode->projections()[index];
+ auto type = expr->type();
+
+ auto unnestFieldExpr = std::make_shared<core::FieldAccessTypedExpr>(type,
name);
+ VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only
support field");
+ unnestFields.emplace_back(unnestFieldExpr);
+}
+
+} // namespace
+
core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const
::substrait::GenerateRel& generateRel) {
core::PlanNodePtr childNode;
if (generateRel.has_input()) {
@@ -732,22 +749,39 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
auto projNode = std::dynamic_pointer_cast<const
core::ProjectNode>(childNode);
+ bool isStack = generateRel.has_advanced_extension() &&
+
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(),
"isStack=");
+
if (projNode != nullptr && projNode->names().size() >
requiredChildOutput.size()) {
- // Generator function's input is not a field reference, e.g.
explode(array(1,2,3)), a sample
- // input substrait plan is like the following(the plan structure is
ensured by scala code):
- //
- // Generate explode([1,2,3] AS _pre_0#129), false, [col#126]
- // +- Project [fake_column#128, [1,2,3] AS _pre_0#129]
- // +- RewrittenNodeWall Scan OneRowRelation[fake_column#128]
- //
- // The last projection column in GeneratorRel's child(Project) is the
column we need to unnest
- auto innerName = projNode->names().back();
- auto innerExpr = projNode->projections().back();
-
- auto innerType = innerExpr->type();
- auto unnestFieldExpr =
std::make_shared<core::FieldAccessTypedExpr>(innerType, innerName);
- VELOX_CHECK_NOT_NULL(unnestFieldExpr, " the key in unnest Operator only
support field");
- unnest.emplace_back(unnestFieldExpr);
+ // Generator function's input is NOT a field reference.
+ if (!isStack) {
+ // For generator function which is not stack, e.g.
explode(array(1,2,3)), a sample
+ // input substrait plan is like the following:
+ //
+ // Generate explode([1,2,3] AS _pre_0#129), false, [col#126]
+ // +- Project [fake_column#128, [1,2,3] AS _pre_0#129]
+ // +- RewrittenNodeWall Scan OneRowRelation[fake_column#128]
+ // The last projection column in GeneratorRel's child(Project) is the
column we need to unnest
+ extractUnnestFieldExpr(projNode, projNode->projections().size() - 1,
unnest);
+ } else {
+ // For stack function, e.g. stack(2, 1,2,3), a sample
+ // input substrait plan is like the following:
+ //
+ // Generate stack(2, id#122, name#123, id1#124, name1#125), false,
[col0#137, col1#138]
+ // +- Project [id#122, name#123, id1#124, name1#125, array(id#122,
id1#124) AS _pre_0#141, array(name#123,
+ // name1#125) AS _pre_1#142]
+ // +- RewrittenNodeWall LocalTableScan [id#122, name#123, id1#124,
name1#125]
+ //
+ // The last `numFields` projections are the fields we want to unnest.
+ auto generatorFunc = generator.scalar_function();
+ auto numRows =
SubstraitParser::getLiteralValue<int32_t>(generatorFunc.arguments(0).value().literal());
+ auto numFields =
static_cast<int32_t>(std::ceil((generatorFunc.arguments_size() - 1.0) /
numRows));
+ auto totalProjectCount = projNode->names().size();
+
+ for (auto i = totalProjectCount - numFields; i < totalProjectCount; ++i)
{
+ extractUnnestFieldExpr(projNode, i, unnest);
+ }
+ }
} else {
// Generator function's input is a field reference, e.g. explode(col),
generator
// function's first argument is the field reference we need to unnest.
diff --git a/docs/velox-backend-support-progress.md
b/docs/velox-backend-support-progress.md
index b8acce266..fb68740c7 100644
--- a/docs/velox-backend-support-progress.md
+++ b/docs/velox-backend-support-progress.md
@@ -273,7 +273,7 @@ Gluten supports 199 functions. (Drag to right to see all
data types)
| array_sort | array_sort | array_sort
| S | | | | | | |
| | | | | | | | |
| | | |
| array_union | |
| | | | | | | |
| | | | | | | | |
| | | |
| arrays_overlap | array_overlap |
| | | | | | | |
| | | | | | | | |
| | | |
-| arrays_zip | zip |
| S | | | | | |
| | | | | | | | |
| | | | |
+| arrays_zip | zip |
| S | | | | | |
| | | | | | | | |
| | | | |
| cardinality | cardinality |
| | | | | | | |
| | | | | | | | |
| | | |
| element_at | element_at | element_at
| S | | | | | | |
| | | | | | | | |
S | S | | |
| exists | any_match |
| S | | | | | | |
| | | | | | | | |
| | | |
@@ -446,6 +446,6 @@ Gluten supports 199 functions. (Drag to right to see all
data types)
| sha1 | sha1 | sha1
| S | | | | | | |
| | | | S | | | | |
| | | |
| sha2 | | sha2
| S | | | | | | |
| | | | S | | | | |
| | | |
| spark_partition_id | |
| S | | | | | | |
| | | | | | | | |
| | | |
-| stack | |
| | | | | | | |
| | | | | | | | |
| | | |
+| stack | |
| S | | S | S | S | S | S | S
| S | S | S | S | S | S | S | S |
S | S | S | S |
| xxhash64 | xxhash64 | xxhash64
| | | | | | | |
| | | | | | | | |
| | | |
| uuid | uuid | uuid
| S | | | | | | |
| | | | | | | | |
| | | |
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index f910c8a98..c734967de 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -276,7 +276,9 @@ object ExpressionMappings {
Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID),
Sig[SparkPartitionID](SPARK_PARTITION_ID),
// Decimal
- Sig[UnscaledValue](UNSCALED_VALUE)
+ Sig[UnscaledValue](UNSCALED_VALUE),
+ // Generator function
+ Sig[Stack](STACK)
) ++ SparkShimLoader.getSparkShims.scalarExpressionMappings
/** Mapping Spark aggregate expression to Substrait function name */
diff --git
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index 1b73b2686..eded85e06 100644
---
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -294,6 +294,7 @@ object ExpressionNames {
final val AGGREGATE = "aggregate"
final val LAMBDAFUNCTION = "lambdafunction"
final val EXPLODE = "explode"
+ final val STACK = "stack"
final val INLINE = "inline"
final val POSEXPLODE = "posexplode"
final val CHECK_OVERFLOW = "check_overflow"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]