This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fea47053c [GLUTEN-5618][CH] Fix 'Position x is out of bound in Block'
error when executing count distinct (#5619)
fea47053c is described below
commit fea47053c9716ad1cc2433403312b348889ca959
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue May 7 09:43:12 2024 +0800
[GLUTEN-5618][CH] Fix 'Position x is out of bound in Block' error when
executing count distinct (#5619)
When excuting count distinct, the group by keys are also in the count
distinct expression, it will throw 'Position x is out of bound in Block' error
or core dump.
RC:
CH backend will remove the duplicated column when executing pipeline.
Close #5618.
---
.../clickhouse/CHSparkPlanExecApi.scala | 7 +-
.../GlutenClickhouseCountDistinctSuite.scala | 98 ++++++++++++++++++++++
.../extension/CountDistinctWithoutExpand.scala | 6 +-
3 files changed, 107 insertions(+), 4 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index ee8b7dd45..64090af28 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -194,12 +194,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan): HashAggregateExecBaseTransformer =
CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
- groupingExpressions,
+ groupingExpressions.distinct,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
- resultExpressions,
- child)
+ resultExpressions.distinct,
+ child
+ )
/** Generate HashAggregateExecPullOutHelper */
override def genHashAggregateExecPullOutHelper(
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
index b12f886e5..1b954df22 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
@@ -115,4 +115,102 @@ class GlutenClickhouseCountDistinctSuite extends
GlutenClickHouseWholeStageTrans
"values (0, null,1), (0,null,1), (1, 1,1), (2, 2, 1) ,(2,2,2),(3,3,3) as
data(a,b,c)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
+
+ test(
+ "Gluten-5618: [CH] Fix 'Position x is out of bound in Block' error " +
+ "when executing count distinct") {
+
+ withSQLConf(("spark.gluten.sql.countDistinctWithoutExpand", "false")) {
+ val sql =
+ """
+ |select count(distinct a, b, c) from
+ |values (0, null, 1), (1, 1, 1), (2, 2, 1), (1, 2, 1) ,(2, 2, 2) as
data(a,b,c) group by c
+ |""".stripMargin
+
+ compareResultsAgainstVanillaSpark(
+ sql,
+ true,
+ {
+ df =>
+ {
+
+ val planExecs = df.queryExecution.executedPlan.collect {
+ case aggTransformer: HashAggregateExecBaseTransformer =>
aggTransformer
+ }
+
+ planExecs.head.aggregateExpressions.foreach {
+ expr => assert(expr.toString().startsWith("count("))
+ }
+ planExecs(1).aggregateExpressions.foreach {
+ expr => assert(expr.toString().startsWith("partial_count("))
+ }
+ }
+ }
+ )
+ }
+
+ val sql =
+ """
+ |select count(distinct a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
+ |from values
+ |(0, null, 1, 0, null, 1, 0, 5, 1, 0),
+ |(null, 1, 1, null, 1, 1, null, 1, 1, 3),
+ |(2, 2, 1, 2, 2, 1, 2, 2, 1, 2),
+ |(1, 2, null, 1, 2, null, 1, 2, 3, 1),
+ |(2, 2, 2, 2, 2, 2, 2, 2, 2, 2)
+ |as data(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
+ |group by a10
+ |""".stripMargin
+
+ compareResultsAgainstVanillaSpark(
+ sql,
+ true,
+ {
+ df =>
+ {
+
+ val planExecs = df.queryExecution.executedPlan.collect {
+ case aggTransformer: HashAggregateExecBaseTransformer =>
aggTransformer
+ }
+
+ planExecs.head.aggregateExpressions.foreach {
+ expr => assert(expr.toString().startsWith("count("))
+ }
+ planExecs(1).aggregateExpressions.foreach {
+ expr => assert(expr.toString().startsWith("partial_count("))
+ }
+ }
+ }
+ )
+
+ val sql1 =
+ """
+ |select count(distinct a, b, c)
+ |from
+ |values (0, null, 1), (1, 1, 1), (null, 2, 1), (1, 2, 1) ,(2, 2, null)
+ |as data(a,b,c)
+ |group by c
+ |""".stripMargin
+
+ compareResultsAgainstVanillaSpark(
+ sql1,
+ true,
+ {
+ df =>
+ {
+
+ val planExecs = df.queryExecution.executedPlan.collect {
+ case aggTransformer: HashAggregateExecBaseTransformer =>
aggTransformer
+ }
+
+ planExecs.head.aggregateExpressions.foreach {
+ expr => assert(expr.toString().startsWith("countdistinct("))
+ }
+ planExecs(1).aggregateExpressions.foreach {
+ expr =>
assert(expr.toString().startsWith("partial_countdistinct("))
+ }
+ }
+ }
+ )
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
index 43cc68ead..82051baee 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
@@ -36,7 +36,11 @@ object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableCountDistinctWithoutExpand
) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION))
{
- case ae: AggregateExpression if ae.isDistinct &&
ae.aggregateFunction.isInstanceOf[Count] =>
+ case ae: AggregateExpression
+ if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
+ // The maximum number of arguments for aggregate function with
Nullable types in CH
+ // backend is 8
+ ae.aggregateFunction.children.size <= 8 =>
ae.copy(
aggregateFunction =
CountDistinct.apply(ae.aggregateFunction.asInstanceOf[Count].children),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]