Nikita Eshkeev created SPARK-43326:
--------------------------------------

             Summary: CoalesceBucketsInJoin not working with SHUFFLE_HASH hint
                 Key: SPARK-43326
                 URL: https://issues.apache.org/jira/browse/SPARK-43326
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.3.2
            Reporter: Nikita Eshkeev


h1. NOTICE: related to SPARK-43021
h1. What I did

I define the following code:

{{from pyspark.sql import SparkSession}}

{{spark = (}}
{{  SparkSession}}
{{    .builder}}
{{    .appName("Bucketing")}}
{{    .master("local[4]")}}
{{    .config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", True)}}
{{    .config("spark.sql.autoBroadcastJoinThreshold", "-1")}}
{{    .getOrCreate()}}
{{)}}

{{# AQE prevents CoalesceBucketsInJoin in 3.3.2 in 3.3.2}}
{{spark.conf.set("spark.sql.adaptive.enabled", False)}}

{{df1 = spark.range(0, 100)}}
{{df2 = spark.range(0, 100, 2)}}

{{df1.write.bucketBy(4, "id").mode("overwrite").saveAsTable("t1")}}
{{df2.write.bucketBy(2, "id").mode("overwrite").saveAsTable("t2")}}

{{t1 = spark.table("t1")}}
{{t2 = spark.table("t2")}}

{{t2.join(t1.hint("SHUFFLE_HASH"), "id").explain()}}

{{== Physical Plan ==}}
{{*(3) Project [id#23L|#23L]}}
{{+- *(3) ShuffledHashJoin [id#23L|#23L], [id#21L|#21L], Inner, BuildRight}}
{{:- Exchange hashpartitioning(id#23L, 4), ENSURE_REQUIREMENTS, [plan_id=667]}}
{{: +- *(1) Filter isnotnull(id#23L)}}
{{: +- *(1) ColumnarToRow}}
{{: +- FileScan parquet spark_catalog.default.t2[id#23L|#23L] Batched: true, 
Bucketed: false (disabled by query planner), DataFilters: 
[isnotnull(id#23L)|#23L)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/home/jovyan/notebooks/spark-warehouse/t2|file:///home/jovyan/notebooks/spark-warehouse/t2],
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>}}
{{+- *(2) Filter isnotnull(id#21L)}}
{{+- *(2) ColumnarToRow}}
{{+- FileScan parquet spark_catalog.default.t1[id#21L|#21L] Batched: true, 
Bucketed: true, DataFilters: [isnotnull(id#21L)|#21L)], Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/home/jovyan/notebooks/spark-warehouse/t1|file:///home/jovyan/notebooks/spark-warehouse/t1],
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>, SelectedBucketsCount: 4 out of 4}}
h1. What happened

There is an Exchange node in the join plan
h1. What is expected

According to the docs about 
[{{spark.sql.bucketing.coalesceBucketsInJoin.enabled}}|https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration]
 there should not be any  Exchange/Shuffle nodes in the plan (the number of 
buckets of t1 is divided by the number of buckets of t2), but it doesn't say 
anything about what should happen if there are hints applied.

h1. Observation

# I checked other hints (MERGE and SHUFFLE_REPLICATE_NL) and they don't yield 
any Exchange nodes in the plan
# When I apply the hint to the t2 table ({{t2.hint("SHUFFLE_HASH").join(t1, 
"id")}}), no exchange happen for any hint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to