[ https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-28771: ---------------------------------- Affects Version/s: (was: 2.4.3) 3.0.0 > Join partitioned dataframes on superset of partitioning columns without > shuffle > ------------------------------------------------------------------------------- > > Key: SPARK-28771 > URL: https://issues.apache.org/jira/browse/SPARK-28771 > Project: Spark > Issue Type: Improvement > Components: Optimizer, SQL > Affects Versions: 3.0.0 > Environment: > {code:java} > from pyspark import SparkConf > from pyspark.sql import SparkSession > conf = SparkConf() > conf.setAll({ > 'spark.master': 'local[*]', > 'spark.sql.execution.arrow.enabled': 'true', > 'spark.sql.autoBroadcastJoinThreshold': '-1', > 'spark.sql.shuffle.partitions': '10' > }.items()); > spark = SparkSession.builder.config(conf=conf).getOrCreate() > {code} > > > Reporter: Artem Bergkamp > Priority: Minor > > Hi Spark developers. > Few months ago I asked this question at > [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] > but could not get usable solution. > Only one valid suggestion was to implement it via catalyst optimizer > extensions but this is not something that an ordinary user can do. > I decided to raise improvement request since think such functionality should > be available out of the box. > Suppose I have two partitioned dataframes: > {code:java} > df1 = spark.createDataFrame( > [(1,1,1), (2,2,2)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame( > [(1,1,1)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2') > {code} > *(scenario 1)* If I join them by [key1, key2] join operation is performed > within each partition without shuffle (number of partitions in result > dataframe is the same): > {code:java} > x = df1.join(df2, on=['key1', 'key2'], how='left') > assert x.rdd.getNumPartitions() == 3 > {code} > *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation > takes place (number of partitions in result dataframe is 10 which is driven > by spark.sql.shuffle.partitions option): > {code:java} > x = df1.join(df2, on=['key1', 'key2', 'time'], how='left') > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 3)* Join them by [key1, key2, time] via another version of join > method: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > df1['time'] == df2['time'] > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 4)* Join them by [key1, key2, time] via another version of join > method with quality condition changed to equivalent. And surprisingly *it > uses partitioning*: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > (df1['time'] <= df2['time']) & (df1['time'] >= df2['time']) > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 3 > {code} > *I expect all four described join scenarios to use partitioning and avoid > shuffle.* > At the same time groupby and window operations by [key1, key2, time] preserve > number of partitions and done without shuffle. -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org