This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bf178e1  [SPARK-36898][SQL] Make the shuffle hash join factor 
configurable
bf178e1 is described below

commit bf178e167f80a167a170d17d77d191022792c641
Author: Ke Jia <ke.a....@intel.com>
AuthorDate: Wed Oct 20 17:16:30 2021 +0800

    [SPARK-36898][SQL] Make the shuffle hash join factor configurable
    
    ### What changes were proposed in this pull request?
    Currently the shuffle hash join factor is 3x, which cannot be changed. This 
PR make the factor can be configurable by adding a new config 
"spark.sql.shuffleHashJoinFactor".
    
    ### Why are the changes needed?
    Make the choice of shuffle hash join more flexible
    
    ### Does this PR introduce _any_ user-facing change?
    Add new config
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #34150 from JkSelf/shjconfig.
    
    Authored-by: Ke Jia <ke.a....@intel.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/catalyst/optimizer/joins.scala | 11 ++++++-----
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala    |  8 ++++++++
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 90b25a9..e945ee3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -275,7 +275,7 @@ trait JoinSelectionHelper {
     } else {
       hintToPreferShuffleHashJoinLeft(hint) ||
         (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) &&
-          muchSmaller(left, right)) ||
+          muchSmaller(left, right, conf)) ||
         forceApplyShuffledHashJoin(conf)
     }
     val buildRight = if (hintOnly) {
@@ -283,7 +283,7 @@ trait JoinSelectionHelper {
     } else {
       hintToPreferShuffleHashJoinRight(hint) ||
         (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) 
&&
-          muchSmaller(right, left)) ||
+          muchSmaller(right, left, conf)) ||
         forceApplyShuffledHashJoin(conf)
     }
     getBuildSide(
@@ -426,14 +426,15 @@ trait JoinSelectionHelper {
   }
 
   /**
-   * Returns whether plan a is much smaller (3X) than plan b.
+   * Returns true if the data size of plan a multiplied by 
SHUFFLE_HASH_JOIN_FACTOR
+   * is smaller than plan b.
    *
    * The cost to build hash map is higher than sorting, we should only build 
hash map on a table
    * that is much smaller than other one. Since we does not have the statistic 
for number of rows,
    * use the size of bytes here as estimation.
    */
-  private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
-    a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
+  private def muchSmaller(a: LogicalPlan, b: LogicalPlan, conf: SQLConf): 
Boolean = {
+    a.stats.sizeInBytes * conf.getConf(SQLConf.SHUFFLE_HASH_JOIN_FACTOR) <= 
b.stats.sizeInBytes
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c90d6f8..eed74da 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -416,6 +416,14 @@ object SQLConf {
     .bytesConf(ByteUnit.BYTE)
     .createWithDefaultString("10MB")
 
+  val SHUFFLE_HASH_JOIN_FACTOR = buildConf("spark.sql.shuffledHashJoinFactor")
+    .doc("The shuffle hash join can be selected if the data size of small" +
+      " side multiplied by this factor is still smaller than the large side.")
+    .version("3.3.0")
+    .intConf
+    .checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
+    .createWithDefault(3)
+
   val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
     .internal()
     .doc("Minimal increase rate in number of partitions between attempts when 
executing a take " +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to