I happen to encounter something similar.

it's probably because you are just `explain` it. when you actually `run`
it. you will get the final spark plan in which case the exchange will be
reused.
right, this is different compared with 3.1 probably because the upgraded
aqe.

not sure whether this is expected though.

On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <abdealikoth...@gmail.com>
wrote:

> Just thought I'd do a quick bump and add the dev mailing list - in case
> there is some insight there
> Feels like this should be categorized as a bug for spark 3.2.0
>
> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <abdealikoth...@gmail.com>
> wrote:
>
>> Hi,
>> I am using pyspark for some projects. And one of the things we are doing
>> is trying to find the tables/columns being used by Spark using the
>> execution plan.
>>
>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>> previous versions - mainly when we are doing joins.
>> Below is a reproducible example (you could run the same in versions 2.3
>> to 3.1 to see the difference)
>>
>> My original data frames have the columns: id#0 and id#4
>> But after doing the joins we are seeing new columns id#34 and id#19 which
>> are not created from the original dataframes I was working with.
>> In previous versions of spark, this used to use a ReusedExchange step
>> (shown below)
>>
>> I was trying to understand if this is expected in spark 3.2 where the
>> execution plan seems to be creating a new data source which does not
>> originate from df1 and df2 which I provided.
>> NOTE: The same happens even if I read from parquet files
>>
>> In spark 3.2:
>> In [1]: import pyspark
>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>
>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>> 'col2'])
>>    ...: df1.explain()
>>    ...: df2.explain()
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>
>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#53]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#20L]
>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#45]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>          +- Filter isnotnull(id#19L)                  +- Scan
>> ExistingRDD[id#19L,col2#20L]*
>>
>> In [4]: df1.createOrReplaceTempView('df1')
>>    ...: df2.createOrReplaceTempView('df2')
>>    ...: df3 = spark.sql("""
>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>    ...: """)
>>    ...: df3.createOrReplaceTempView('df3')
>>    ...: df4 = spark.sql("""
>>    ...:     SELECT df2.*, df3.*
>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>    ...: """)
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#110]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#35L]
>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#102]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>          +- Filter isnotnull(id#34L)                  +- Scan
>> ExistingRDD[id#34L,col2#35L]*
>>
>>
>> Doing this in spark 3.1.1 - the plan is:
>>
>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
>> :     +- *(1) Filter isnotnull(id#4L)
>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>> [id=#62]
>>       :     +- *(3) Filter isnotnull(id#0L)
>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>
>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>> ENSURE_REQUIREMENTS, [id=#56]*
>>
>>

-- 
~~~~~~~~~~~~~~~
no mistakes
~~~~~~~~~~~~~~~~~~

Reply via email to