[ 
https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-28771:
----------------------------------
    Affects Version/s:     (was: 2.4.3)
                       3.0.0

> Join partitioned dataframes on superset of partitioning columns without 
> shuffle
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-28771
>                 URL: https://issues.apache.org/jira/browse/SPARK-28771
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 3.0.0
>         Environment:  
> {code:java}
> from pyspark import SparkConf
> from pyspark.sql import SparkSession
> conf = SparkConf()
> conf.setAll({
>   'spark.master': 'local[*]',
>   'spark.sql.execution.arrow.enabled': 'true',
>   'spark.sql.autoBroadcastJoinThreshold': '-1',
>   'spark.sql.shuffle.partitions': '10'
> }.items());
> spark = SparkSession.builder.config(conf=conf).getOrCreate()
> {code}
>  
>  
>            Reporter: Artem Bergkamp
>            Priority: Minor
>
> Hi Spark developers.
> Few months ago I asked this question at 
> [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark]
>  but could not get usable solution.
>  Only one valid suggestion was to implement it via catalyst optimizer 
> extensions but this is not something that an ordinary user can do.
>  I decided to raise improvement request since think such functionality should 
> be available out of the box.
> Suppose I have two partitioned dataframes:
> {code:java}
> df1 = spark.createDataFrame(
>     [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame(
>     [(1,1,1)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')
> {code}
> *(scenario 1)* If I join them by [key1, key2] join operation is performed 
> within each partition without shuffle (number of partitions in result 
> dataframe is the same):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2'], how='left')
> assert x.rdd.getNumPartitions() == 3
> {code}
> *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation 
> takes place (number of partitions in result dataframe is 10 which is driven 
> by spark.sql.shuffle.partitions option):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 3)* Join them by [key1, key2, time] via another version of join 
> method:
> {code:java}
> x = df1.join(df2, [
>     df1['key1'] == df2['key1'],
>     df1['key2'] == df2['key2'],
>     df1['time'] == df2['time']
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 4)* Join them by [key1, key2, time] via another version of join 
> method with quality condition changed to equivalent. And surprisingly *it 
> uses partitioning*:
> {code:java}
> x = df1.join(df2, [
>     df1['key1'] == df2['key1'],
>     df1['key2'] == df2['key2'],
>     (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 3
> {code}
> *I expect all four described join scenarios to use partitioning and avoid 
> shuffle.*
> At the same time groupby and window operations by [key1, key2, time] preserve 
> number of partitions and done without shuffle.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to