This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 02b2239  [SPARK-38286][SQL] Union's maxRows and maxRowsPerPartition 
may overflow
02b2239 is described below

commit 02b2239e7cf1291d2f626e6c2f1912001b340763
Author: Ruifeng Zheng <ruife...@foxmail.com>
AuthorDate: Thu Feb 24 10:49:52 2022 +0800

    [SPARK-38286][SQL] Union's maxRows and maxRowsPerPartition may overflow
    
    ### What changes were proposed in this pull request?
    check Union's maxRows and maxRowsPerPartition
    
    ### Why are the changes needed?
    Union's maxRows and maxRowsPerPartition may overflow:
    
    case 1:
    ```
    scala> val df1 = spark.range(0, Long.MaxValue, 1, 1)
    df1: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> val df2 = spark.range(0, 100, 1, 10)
    df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> val union = df1.union(df2)
    union: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> union.queryExecution.logical.maxRowsPerPartition
    res19: Option[Long] = Some(-9223372036854775799)
    
    scala> union.queryExecution.logical.maxRows
    res20: Option[Long] = Some(-9223372036854775709)
    ```
    
    case 2:
    ```
    scala> val n = 2000000
    n: Int = 2000000
    
    scala> val df1 = spark.range(0, n, 1, 1).selectExpr("id % 5 as key1", "id 
as value1")
    df1: org.apache.spark.sql.DataFrame = [key1: bigint, value1: bigint]
    
    scala> val df2 = spark.range(0, n, 1, 2).selectExpr("id % 3 as key2", "id 
as value2")
    df2: org.apache.spark.sql.DataFrame = [key2: bigint, value2: bigint]
    
    scala> val df3 = spark.range(0, n, 1, 3).selectExpr("id % 4 as key3", "id 
as value3")
    df3: org.apache.spark.sql.DataFrame = [key3: bigint, value3: bigint]
    
    scala> val joined = df1.join(df2, col("key1") === col("key2")).join(df3, 
col("key1") === col("key3"))
    joined: org.apache.spark.sql.DataFrame = [key1: bigint, value1: bigint ... 
4 more fields]
    
    scala> val unioned = joined.select(col("key1"), 
col("value3")).union(joined.select(col("key1"), col("value2")))
    unioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key1: 
bigint, value3: bigint]
    
    scala> unioned.queryExecution.optimizedPlan.maxRows
    res32: Option[Long] = Some(-2446744073709551616)
    
    scala> unioned.queryExecution.optimizedPlan.maxRows
    res33: Option[Long] = Some(-2446744073709551616)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added testsuite
    
    Closes #35609 from zhengruifeng/union_maxRows_validate.
    
    Authored-by: Ruifeng Zheng <ruife...@foxmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 683bc46ff9a791ab6b9cd3cb95be6bbc368121e0)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../plans/logical/basicLogicalOperators.scala      | 30 ++++++++++++++++------
 .../sql/catalyst/plans/LogicalPlanSuite.scala      |  8 ++++++
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6748db5..687bf19 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -277,11 +277,18 @@ case class Union(
   assert(!allowMissingCol || byName, "`allowMissingCol` can be true only if 
`byName` is true.")
 
   override def maxRows: Option[Long] = {
-    if (children.exists(_.maxRows.isEmpty)) {
-      None
-    } else {
-      Some(children.flatMap(_.maxRows).sum)
+    var sum = BigInt(0)
+    children.foreach { child =>
+      if (child.maxRows.isDefined) {
+        sum += child.maxRows.get
+        if (!sum.isValidLong) {
+          return None
+        }
+      } else {
+        return None
+      }
     }
+    Some(sum.toLong)
   }
 
   final override val nodePatterns: Seq[TreePattern] = Seq(UNION)
@@ -290,11 +297,18 @@ case class Union(
    * Note the definition has assumption about how union is implemented 
physically.
    */
   override def maxRowsPerPartition: Option[Long] = {
-    if (children.exists(_.maxRowsPerPartition.isEmpty)) {
-      None
-    } else {
-      Some(children.flatMap(_.maxRowsPerPartition).sum)
+    var sum = BigInt(0)
+    children.foreach { child =>
+      if (child.maxRowsPerPartition.isDefined) {
+        sum += child.maxRowsPerPartition.get
+        if (!sum.isValidLong) {
+          return None
+        }
+      } else {
+        return None
+      }
     }
+    Some(sum.toLong)
   }
 
   def duplicateResolved: Boolean = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 0cd6d81..acb41b0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -105,4 +105,12 @@ class LogicalPlanSuite extends SparkFunSuite {
     assert(Range(0, 100, 1, 3).select('id).maxRowsPerPartition === Some(34))
     assert(Range(0, 100, 1, 3).where('id % 2 === 1).maxRowsPerPartition === 
Some(34))
   }
+
+  test("SPARK-38286: Union's maxRows and maxRowsPerPartition may overflow") {
+    val query1 = Range(0, Long.MaxValue, 1, 1)
+    val query2 = Range(0, 100, 1, 10)
+    val query = query1.union(query2)
+    assert(query.maxRows.isEmpty)
+    assert(query.maxRowsPerPartition.isEmpty)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to