Nikita Eshkeev created SPARK-43326:
--------------------------------------
Summary: CoalesceBucketsInJoin not working with SHUFFLE_HASH hint
Key: SPARK-43326
URL: https://issues.apache.org/jira/browse/SPARK-43326
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.3.2
Reporter: Nikita Eshkeev
h1. NOTICE: related to SPARK-43021
h1. What I did
I define the following code:
{{from pyspark.sql import SparkSession}}
{{spark = (}}
{{ SparkSession}}
{{ .builder}}
{{ .appName("Bucketing")}}
{{ .master("local[4]")}}
{{ .config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", True)}}
{{ .config("spark.sql.autoBroadcastJoinThreshold", "-1")}}
{{ .getOrCreate()}}
{{)}}
{{# AQE prevents CoalesceBucketsInJoin in 3.3.2 in 3.3.2}}
{{spark.conf.set("spark.sql.adaptive.enabled", False)}}
{{df1 = spark.range(0, 100)}}
{{df2 = spark.range(0, 100, 2)}}
{{df1.write.bucketBy(4, "id").mode("overwrite").saveAsTable("t1")}}
{{df2.write.bucketBy(2, "id").mode("overwrite").saveAsTable("t2")}}
{{t1 = spark.table("t1")}}
{{t2 = spark.table("t2")}}
{{t2.join(t1.hint("SHUFFLE_HASH"), "id").explain()}}
{{== Physical Plan ==}}
{{*(3) Project [id#23L|#23L]}}
{{+- *(3) ShuffledHashJoin [id#23L|#23L], [id#21L|#21L], Inner, BuildRight}}
{{:- Exchange hashpartitioning(id#23L, 4), ENSURE_REQUIREMENTS, [plan_id=667]}}
{{: +- *(1) Filter isnotnull(id#23L)}}
{{: +- *(1) ColumnarToRow}}
{{: +- FileScan parquet spark_catalog.default.t2[id#23L|#23L] Batched: true,
Bucketed: false (disabled by query planner), DataFilters:
[isnotnull(id#23L)|#23L)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/home/jovyan/notebooks/spark-warehouse/t2|file:///home/jovyan/notebooks/spark-warehouse/t2],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>}}
{{+- *(2) Filter isnotnull(id#21L)}}
{{+- *(2) ColumnarToRow}}
{{+- FileScan parquet spark_catalog.default.t1[id#21L|#21L] Batched: true,
Bucketed: true, DataFilters: [isnotnull(id#21L)|#21L)], Format: Parquet,
Location: InMemoryFileIndex(1
paths)[file:/home/jovyan/notebooks/spark-warehouse/t1|file:///home/jovyan/notebooks/spark-warehouse/t1],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>, SelectedBucketsCount: 4 out of 4}}
h1. What happened
There is an Exchange node in the join plan
h1. What is expected
According to the docs about
[{{spark.sql.bucketing.coalesceBucketsInJoin.enabled}}|https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration]
there should not be any Exchange/Shuffle nodes in the plan (the number of
buckets of t1 is divided by the number of buckets of t2), but it doesn't say
anything about what should happen if there are hints applied.
h1. Observation
# I checked other hints (MERGE and SHUFFLE_REPLICATE_NL) and they don't yield
any Exchange nodes in the plan
# When I apply the hint to the t2 table ({{t2.hint("SHUFFLE_HASH").join(t1,
"id")}}), no exchange happen for any hint.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]