This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c059c8402df6 [SPARK-48421][SQL] SPJ: Add documentation
c059c8402df6 is described below

commit c059c8402df66586e1a6c5fe72a9f1aa4e5e5a48
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Jun 12 16:50:15 2024 -0700

    [SPARK-48421][SQL] SPJ: Add documentation
    
    ### What changes were proposed in this pull request?
    Add docs for SPJ
    
    ### Why are the changes needed?
    There are no docs describing SPJ, even though it is mentioned in migration 
notes:  https://github.com/apache/spark/pull/46673
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Checked the new text
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46745 from szehon-ho/doc_spj.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 docs/_data/menu-sql.yaml       |   2 +
 docs/sql-performance-tuning.md | 119 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 121 insertions(+)

diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml
index 059a9bdc1af4..01c8a8076958 100644
--- a/docs/_data/menu-sql.yaml
+++ b/docs/_data/menu-sql.yaml
@@ -63,6 +63,8 @@
       url: sql-performance-tuning.html#optimizing-the-join-strategy
     - text: Adaptive Query Execution
       url: sql-performance-tuning.html#adaptive-query-execution
+    - text: Storage Partition Join
+      url: sql-performance-tuning.html#storage-partition-join
 - text: Distributed SQL Engine
   url: sql-distributed-sql-engine.html
   subitems:
diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index b443e3d9c5f5..12b79828e44c 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -428,3 +428,122 @@ You can control the details of how AQE works by providing 
your own cost evaluato
       <td>3.2.0</td>
     </tr>
   </table>
+
+## Storage Partition Join
+
+Storage Partition Join (SPJ) is an optimization technique in Spark SQL that 
makes use the existing storage layout to avoid the shuffle phase.
+
+This is a generalization of the concept of Bucket Joins, which is only 
applicable for 
[bucketed](sql-data-sources-load-save-functions.html#bucketing-sorting-and-partitioning)
 tables, to tables partitioned by functions registered in FunctionCatalog. 
Storage Partition Joins are currently supported for compatible V2 DataSources.
+
+The following SQL properties enable Storage Partition Join in different join 
queries with various optimizations.
+
+  <table class="spark-config">
+    <thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since 
Version</th></tr></thead>
+    <tr>
+      <td><code>spark.sql.sources.v2.bucketing.enabled</code></td>
+      <td>false</td>
+      <td>
+        When true, try to eliminate shuffle by using the partitioning reported 
by a compatible V2 data source.
+      </td>
+      <td>3.3.0</td>
+    </tr>
+    <tr>
+      
<td><code>spark.sql.sources.v2.bucketing.pushPartValues.enabled</code></td>
+      <td>true</td>
+      <td>
+        When enabled, try to eliminate shuffle if one side of the join has 
missing partition values from the other side. This config requires 
<code>spark.sql.sources.v2.bucketing.enabled</code> to be true.
+      </td>
+      <td>3.4.0</td>
+    </tr>
+    <tr>
+      <td><code>spark.sql.requireAllClusterKeysForCoPartition</code></td>
+      <td>true</td>
+      <td>
+        When true, require the join or MERGE keys to be same and in the same 
order as the partition keys to eliminate shuffle. Hence, set to <b>false</b> in 
this situation to eliminate shuffle.
+      </td>
+      <td>3.4.0</td>
+    </tr>
+    <tr>
+      
<td><code>spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled</code></td>
+      <td>false</td>
+      <td>
+        When true, and when the join is not a full outer join, enable skew 
optimizations to handle partitions with large amounts of data when avoiding 
shuffle. One side will be chosen as the big table based on table statistics, 
and the splits on this side will be partially-clustered. The splits of the 
other side will be grouped and replicated to match. This config requires both 
<code>spark.sql.sources.v2.bucketing.enabled</code> and 
<code>spark.sql.sources.v2.bucketing.pushPartValues.ena [...]
+      </td>
+      <td>3.4.0</td>
+    </tr>
+    <tr>
+      
<td><code>spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled</code></td>
+      <td>false</td>
+      <td>
+        When enabled, try to avoid shuffle if join or MERGE condition does not 
include all partition columns. This config requires both 
<code>spark.sql.sources.v2.bucketing.enabled</code> and 
<code>spark.sql.sources.v2.bucketing.pushPartValues.enabled</code> to be true, 
and <code>spark.sql.requireAllClusterKeysForCoPartition</code> to be false.
+      </td>
+      <td>4.0.0</td>
+    </tr>
+    <tr>
+      
<td><code>spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled</code></td>
+      <td>false</td>
+      <td>
+        When enabled, try to avoid shuffle if partition transforms are 
compatible but not identical. This config requires both 
<code>spark.sql.sources.v2.bucketing.enabled</code> and 
<code>spark.sql.sources.v2.bucketing.pushPartValues.enabled</code> to be true.
+      </td>
+      <td>4.0.0</td>
+    </tr>
+    <tr>
+      <td><code>spark.sql.sources.v2.bucketing.shuffle.enabled</code></td>
+      <td>false</td>
+      <td>
+        When enabled, try to avoid shuffle on one side of the join, by 
recognizing the partitioning reported by a V2 data source on the other side.
+      </td>
+      <td>4.0.0</td>
+    </tr>
+  </table>
+
+If Storage Partition Join is performed, the query plan will not contain 
Exchange nodes prior to the join.
+
+The following example uses Iceberg 
([https://iceberg.apache.org/docs/latest/spark-getting-started/](https://iceberg.apache.org/docs/latest/spark-getting-started/)),
 a Spark V2 DataSource that supports Storage Partition Join.
+```sql
+CREATE TABLE prod.db.target (id INT, salary INT, dep STRING)
+USING iceberg
+PARTITIONED BY (dep, bucket(8, id))
+
+CREATE TABLE prod.db.source (id INT, salary INT, dep STRING)
+USING iceberg
+PARTITIONED BY (dep, bucket(8, id))
+
+EXPLAIN SELECT * FROM target t INNER JOIN source s
+ON t.dep = s.dep AND t.id = s.id
+
+-- Plan without Storage Partition Join
+== Physical Plan ==
+* Project (12)
++- * SortMergeJoin Inner (11)
+   :- * Sort (5)
+   :  +- Exchange (4) // DATA SHUFFLE
+   :     +- * Filter (3)
+   :        +- * ColumnarToRow (2)
+   :           +- BatchScan (1)
+   +- * Sort (10)
+      +- Exchange (9) // DATA SHUFFLE
+         +- * Filter (8)
+            +- * ColumnarToRow (7)
+               +- BatchScan (6)
+
+
+SET 'spark.sql.sources.v2.bucketing.enabled' 'true'
+SET 'spark.sql.iceberg.planning.preserve-data-grouping' 'true'
+SET 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 'true'
+SET 'spark.sql.requireAllClusterKeysForCoPartition' 'false'
+SET 'spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled' 
'true'
+
+-- Plan with Storage Partition Join
+== Physical Plan ==
+* Project (10)
++- * SortMergeJoin Inner (9)
+   :- * Sort (4)
+   :  +- * Filter (3)
+   :     +- * ColumnarToRow (2)
+   :        +- BatchScan (1)
+   +- * Sort (8)
+      +- * Filter (7)
+         +- * ColumnarToRow (6)
+            +- BatchScan (5)
+```
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to