This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 08bdc9c  [SPARK-31068][SQL] Avoid IllegalArgumentException in 
broadcast exchange
08bdc9c is described below

commit 08bdc9c9b2e44e066e44d35733e0f3b450d24052
Author: LantaoJin <[email protected]>
AuthorDate: Sun Mar 15 20:20:23 2020 -0500

    [SPARK-31068][SQL] Avoid IllegalArgumentException in broadcast exchange
    
    ### What changes were proposed in this pull request?
    Fix the IllegalArgumentException in broadcast exchange when numRows over 
341 million but less than 512 million.
    
    Since the maximum number of keys that `BytesToBytesMap` supports is 1 << 
29, and only 70% of the slots can be used before growing in `HashedRelation`, 
So here the limitation should not be greater equal than 341 million (1 << 29 / 
1.5(357913941)) instead of 512 million.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Manually test.
    
    Closes #27828 from LantaoJin/SPARK-31068.
    
    Lead-authored-by: LantaoJin <[email protected]>
    Co-authored-by: Alan Jin <[email protected]>
    Signed-off-by: Sean Owen <[email protected]>
---
 .../org/apache/spark/unsafe/map/BytesToBytesMap.java    |  3 +--
 .../sql/execution/exchange/BroadcastExchangeExec.scala  | 17 +++++++++++++----
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 7bdd894..a57cd3b 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -96,8 +96,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
    * since that's the largest power-of-2 that's less than Integer.MAX_VALUE. 
We need two long array
    * entries per key, giving us a maximum capacity of (1 &lt;&lt; 29).
    */
-  @VisibleForTesting
-  static final int MAX_CAPACITY = (1 << 29);
+  public static final int MAX_CAPACITY = (1 << 29);
 
   // This choice of page table size and page size means that we can address up 
to 500 gigabytes
   // of memory.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 65e6b7c..f69da86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -34,7 +34,8 @@ import org.apache.spark.sql.execution.{SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.joins.HashedRelation
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-import org.apache.spark.util.{SparkFatalException, ThreadUtils, Utils}
+import org.apache.spark.unsafe.map.BytesToBytesMap
+import org.apache.spark.util.{SparkFatalException, ThreadUtils}
 
 /**
  * A [[BroadcastExchangeExec]] collects, transforms and finally broadcasts the 
result of
@@ -43,6 +44,7 @@ import org.apache.spark.util.{SparkFatalException, 
ThreadUtils, Utils}
 case class BroadcastExchangeExec(
     mode: BroadcastMode,
     child: SparkPlan) extends Exchange {
+  import BroadcastExchangeExec._
 
   private[sql] val runId: UUID = UUID.randomUUID
 
@@ -82,9 +84,9 @@ case class BroadcastExchangeExec(
             val beforeCollect = System.nanoTime()
             // Use executeCollect/executeCollectIterator to avoid conversion 
to Scala types
             val (numRows, input) = child.executeCollectIterator()
-            if (numRows >= 512000000) {
+            if (numRows >= MAX_BROADCAST_TABLE_ROWS) {
               throw new SparkException(
-                s"Cannot broadcast the table with 512 million or more rows: 
$numRows rows")
+                s"Cannot broadcast the table over $MAX_BROADCAST_TABLE_ROWS 
rows: $numRows rows")
             }
 
             val beforeBuild = System.nanoTime()
@@ -104,7 +106,7 @@ case class BroadcastExchangeExec(
             }
 
             longMetric("dataSize") += dataSize
-            if (dataSize >= (8L << 30)) {
+            if (dataSize >= MAX_BROADCAST_TABLE_BYTES) {
               throw new SparkException(
                 s"Cannot broadcast the table that is larger than 8GB: 
${dataSize >> 30} GB")
             }
@@ -173,6 +175,13 @@ case class BroadcastExchangeExec(
 }
 
 object BroadcastExchangeExec {
+  // Since the maximum number of keys that BytesToBytesMap supports is 1 << 29,
+  // and only 70% of the slots can be used before growing in HashedRelation,
+  // here the limitation should not be over 341 million.
+  val MAX_BROADCAST_TABLE_ROWS = (BytesToBytesMap.MAX_CAPACITY / 1.5).toLong
+
+  val MAX_BROADCAST_TABLE_BYTES = 8L << 30
+
   private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
       ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
         
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to