This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 02b2239 [SPARK-38286][SQL] Union's maxRows and maxRowsPerPartition
may overflow
02b2239 is described below
commit 02b2239e7cf1291d2f626e6c2f1912001b340763
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Feb 24 10:49:52 2022 +0800
[SPARK-38286][SQL] Union's maxRows and maxRowsPerPartition may overflow
### What changes were proposed in this pull request?
check Union's maxRows and maxRowsPerPartition
### Why are the changes needed?
Union's maxRows and maxRowsPerPartition may overflow:
case 1:
```
scala> val df1 = spark.range(0, Long.MaxValue, 1, 1)
df1: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val df2 = spark.range(0, 100, 1, 10)
df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val union = df1.union(df2)
union: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> union.queryExecution.logical.maxRowsPerPartition
res19: Option[Long] = Some(-9223372036854775799)
scala> union.queryExecution.logical.maxRows
res20: Option[Long] = Some(-9223372036854775709)
```
case 2:
```
scala> val n = 2000000
n: Int = 2000000
scala> val df1 = spark.range(0, n, 1, 1).selectExpr("id % 5 as key1", "id
as value1")
df1: org.apache.spark.sql.DataFrame = [key1: bigint, value1: bigint]
scala> val df2 = spark.range(0, n, 1, 2).selectExpr("id % 3 as key2", "id
as value2")
df2: org.apache.spark.sql.DataFrame = [key2: bigint, value2: bigint]
scala> val df3 = spark.range(0, n, 1, 3).selectExpr("id % 4 as key3", "id
as value3")
df3: org.apache.spark.sql.DataFrame = [key3: bigint, value3: bigint]
scala> val joined = df1.join(df2, col("key1") === col("key2")).join(df3,
col("key1") === col("key3"))
joined: org.apache.spark.sql.DataFrame = [key1: bigint, value1: bigint ...
4 more fields]
scala> val unioned = joined.select(col("key1"),
col("value3")).union(joined.select(col("key1"), col("value2")))
unioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key1:
bigint, value3: bigint]
scala> unioned.queryExecution.optimizedPlan.maxRows
res32: Option[Long] = Some(-2446744073709551616)
scala> unioned.queryExecution.optimizedPlan.maxRows
res33: Option[Long] = Some(-2446744073709551616)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added testsuite
Closes #35609 from zhengruifeng/union_maxRows_validate.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 683bc46ff9a791ab6b9cd3cb95be6bbc368121e0)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../plans/logical/basicLogicalOperators.scala | 30 ++++++++++++++++------
.../sql/catalyst/plans/LogicalPlanSuite.scala | 8 ++++++
2 files changed, 30 insertions(+), 8 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6748db5..687bf19 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -277,11 +277,18 @@ case class Union(
assert(!allowMissingCol || byName, "`allowMissingCol` can be true only if
`byName` is true.")
override def maxRows: Option[Long] = {
- if (children.exists(_.maxRows.isEmpty)) {
- None
- } else {
- Some(children.flatMap(_.maxRows).sum)
+ var sum = BigInt(0)
+ children.foreach { child =>
+ if (child.maxRows.isDefined) {
+ sum += child.maxRows.get
+ if (!sum.isValidLong) {
+ return None
+ }
+ } else {
+ return None
+ }
}
+ Some(sum.toLong)
}
final override val nodePatterns: Seq[TreePattern] = Seq(UNION)
@@ -290,11 +297,18 @@ case class Union(
* Note the definition has assumption about how union is implemented
physically.
*/
override def maxRowsPerPartition: Option[Long] = {
- if (children.exists(_.maxRowsPerPartition.isEmpty)) {
- None
- } else {
- Some(children.flatMap(_.maxRowsPerPartition).sum)
+ var sum = BigInt(0)
+ children.foreach { child =>
+ if (child.maxRowsPerPartition.isDefined) {
+ sum += child.maxRowsPerPartition.get
+ if (!sum.isValidLong) {
+ return None
+ }
+ } else {
+ return None
+ }
}
+ Some(sum.toLong)
}
def duplicateResolved: Boolean = {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 0cd6d81..acb41b0 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -105,4 +105,12 @@ class LogicalPlanSuite extends SparkFunSuite {
assert(Range(0, 100, 1, 3).select('id).maxRowsPerPartition === Some(34))
assert(Range(0, 100, 1, 3).where('id % 2 === 1).maxRowsPerPartition ===
Some(34))
}
+
+ test("SPARK-38286: Union's maxRows and maxRowsPerPartition may overflow") {
+ val query1 = Range(0, Long.MaxValue, 1, 1)
+ val query2 = Range(0, 100, 1, 10)
+ val query = query1.union(query2)
+ assert(query.maxRows.isEmpty)
+ assert(query.maxRowsPerPartition.isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]