This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c95d925 [SPARK-33260][SQL] Fix incorrect results from SortExec when
sortOrder is Stream
c95d925 is described below
commit c95d925a9add139e87c7b9f5c95571fdb034f724
Author: Ankur Dave <[email protected]>
AuthorDate: Tue Oct 27 13:20:22 2020 -0700
[SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is
Stream
### What changes were proposed in this pull request?
The following query produces incorrect results. The query has two essential
features: (1) it contains a string aggregate, resulting in a `SortExec` node,
and (2) it contains a duplicate grouping key, causing
`RemoveRepetitionFromGroupExpressions` to produce a sort order stored as a
`Stream`.
```sql
SELECT bigint_col_1, bigint_col_9, MAX(CAST(bigint_col_1 AS string))
FROM table_4
GROUP BY bigint_col_1, bigint_col_9, bigint_col_9
```
When the sort order is stored as a `Stream`, the line
`ordering.map(_.child.genCode(ctx))` in `GenerateOrdering#createOrderKeys()`
produces unpredictable side effects to `ctx`. This is because `genCode(ctx)`
modifies `ctx`. When ordering is a `Stream`, the modifications will not happen
immediately as intended, but will instead occur lazily when the returned
`Stream` is used later.
Similar bugs have occurred at least three times in the past:
https://issues.apache.org/jira/browse/SPARK-24500,
https://issues.apache.org/jira/browse/SPARK-25767,
https://issues.apache.org/jira/browse/SPARK-26680.
The fix is to check if `ordering` is a `Stream` and force the modifications
to happen immediately if so.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a unit test for `SortExec` where `sortOrder` is a `Stream`. The test
previously failed and now passes.
Closes #30160 from ankurdave/SPARK-33260.
Authored-by: Ankur Dave <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 3f2a2b5fe6ada37ef86f00737387e6cf2496df74)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/expressions/codegen/GenerateOrdering.scala | 4 +++-
.../scala/org/apache/spark/sql/execution/SortSuite.scala | 13 +++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 63bd59e..5d00519 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -71,7 +71,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder],
BaseOrdering] with
ctx.INPUT_ROW = row
// to use INPUT_ROW we must make sure currentVars is null
ctx.currentVars = null
- ordering.map(_.child.genCode(ctx))
+ // SPARK-33260: To avoid unpredictable modifications to `ctx` when
`ordering` is a Stream, we
+ // use `toIndexedSeq` to make the transformation eager.
+ ordering.toIndexedSeq.map(_.child.genCode(ctx))
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 7654a9d..6a4f3f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -97,6 +97,19 @@ class SortSuite extends SparkPlanTest with
SharedSparkSession {
}
}
+ test("SPARK-33260: sort order is a Stream") {
+ val input = Seq(
+ ("Hello", 4, 2.0),
+ ("Hello", 1, 1.0),
+ ("World", 8, 3.0)
+ )
+ checkAnswer(
+ input.toDF("a", "b", "c"),
+ (child: SparkPlan) => SortExec(Stream('a.asc, 'b.asc, 'c.asc), global =
true, child = child),
+ input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
+ sortAnswers = false)
+ }
+
// Test sorting on different data types
for (
dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]