[
https://issues.apache.org/jira/browse/SPARK-49030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jihoon Son updated SPARK-49030:
-------------------------------
Description:
{code:java}
WITH c AS (SELECT * FROM customer LIMIT 10)
SELECT count(*)
FROM c c1, c c2
WHERE c1.c_customer_sk > c2.c_customer_sk{code}
Suppose a self join query on a CTE such as the one above.
Spark generates a physical plan like the one below for this query.
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#194L])
+- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#233L])
+- Project
+- BroadcastNestedLoopJoin BuildRight, Inner, (c_customer_sk#0 >
c_customer_sk#214)
:- Filter isnotnull(c_customer_sk#0)
: +- GlobalLimit 10, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=256]
: +- LocalLimit 10
: +- FileScan parquet [c_customer_sk#0] Batched: true,
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<c_customer_sk:int>
+- BroadcastExchange IdentityBroadcastMode, [plan_id=263]
+- Filter isnotnull(c_customer_sk#214)
+- GlobalLimit 10, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=259]
+- LocalLimit 10
+- FileScan parquet [c_customer_sk#214] Batched:
true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<c_customer_sk:int>{code}
Evaluating this plan produces non-deterministic result because the limit is
independently pushed into the two sides of the join. Each limit can produce
different data, and thus the join can produce results that vary across runs.
I understand that the query in question is not deterministic (and thus not very
practical) as, due to the nature of the limit in distributed engines, it is not
expected to produce the same result anyway across repeated runs. However, I
would still expect that the query plan evaluation remains deterministic.
Per extended analysis as seen below, it seems that the query plan has changed
at some point during optimization.
{code:java}
== Analyzed Logical Plan ==
count(1): bigint
WithCTE
:- CTERelationDef 2, false
: +- SubqueryAlias c
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Project [c_customer_sk#0, c_customer_id#1, c_current_cdemo_sk#2,
c_current_hdemo_sk#3, c_current_addr_sk#4, c_first_shipto_date_sk#5,
c_first_sales_date_sk#6, c_salutation#7, c_first_name#8, c_last_name#9,
c_preferred_cust_flag#10, c_birth_day#11L, c_birth_month#12L, c_birth_year#13L,
c_birth_country#14, c_login#15, c_email_address#16, c_last_review_date_sk#17]
: +- SubqueryAlias customer
: +- View (`customer`, [c_customer_sk#0, c_customer_id#1,
c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
c_email_address#16, c_last_review_date_sk#17])
: +- Relation
[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
parquet
+- Aggregate [count(1) AS count(1)#194L]
+- Filter (c_customer_sk#0 > c_customer_sk#176)
+- Join Inner
:- SubqueryAlias c1
: +- SubqueryAlias c
: +- CTERelationRef 2, true, [c_customer_sk#0, c_customer_id#1,
c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
c_email_address#16, c_last_review_date_sk#17], false
+- SubqueryAlias c2
+- SubqueryAlias c
+- CTERelationRef 2, true, [c_customer_sk#176,
c_customer_id#177, c_current_cdemo_sk#178, c_current_hdemo_sk#179,
c_current_addr_sk#180, c_first_shipto_date_sk#181, c_first_sales_date_sk#182,
c_salutation#183, c_first_name#184, c_last_name#185, c_preferred_cust_flag#186,
c_birth_day#187L, c_birth_month#188L, c_birth_year#189L, c_birth_country#190,
c_login#191, c_email_address#192, c_last_review_date_sk#193], false
== Optimized Logical Plan ==
Aggregate [count(1) AS count(1)#194L]
+- Project
+- Join Inner, (c_customer_sk#0 > c_customer_sk#214)
:- Filter isnotnull(c_customer_sk#0)
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Project [c_customer_sk#0]
: +- Relation
[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
parquet
+- Filter isnotnull(c_customer_sk#214)
+- GlobalLimit 10
+- LocalLimit 10
+- Project [c_customer_sk#214]
+- Relation
[c_customer_sk#214,c_customer_id#215,c_current_cdemo_sk#216,c_current_hdemo_sk#217,c_current_addr_sk#218,c_first_shipto_date_sk#219,c_first_sales_date_sk#220,c_salutation#221,c_first_name#222,c_last_name#223,c_preferred_cust_flag#224,c_birth_day#225L,c_birth_month#226L,c_birth_year#227L,c_birth_country#228,c_login#229,c_email_address#230,c_last_review_date_sk#231]
parquet {code}
was:
{code:java}
WITH c AS (SELECT * FROM customer LIMIT 10)
SELECT count(*)
FROM c c1, c c2
WHERE c1.c_customer_sk > c2.c_customer_sk{code}
Suppose a self join query on a CTE such as the one above.
Spark generates a physical plan like the one below for this query.
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#194L])
+- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#233L])
+- Project
+- BroadcastNestedLoopJoin BuildRight, Inner, (c_customer_sk#0 >
c_customer_sk#214)
:- Filter isnotnull(c_customer_sk#0)
: +- GlobalLimit 10, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=256]
: +- LocalLimit 10
: +- FileScan parquet [c_customer_sk#0] Batched: true,
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<c_customer_sk:int>
+- BroadcastExchange IdentityBroadcastMode, [plan_id=263]
+- Filter isnotnull(c_customer_sk#214)
+- GlobalLimit 10, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=259]
+- LocalLimit 10
+- FileScan parquet [c_customer_sk#214] Batched:
true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<c_customer_sk:int>{code}
Evaluating this plan produces non-deterministic result because the limit is
independently pushed into the two sides of the join. Each limit can produce
different data, and thus the join can produce results that vary across runs.
I understand that the query in question is not deterministic (and thus not very
practical) as, due to the nature of the limit in distributed engines, it is not
expected to produce the same result anyway across repeated runs. However, I
would still expect that the query plan evaluation remains deterministic.
Per extended analysis as seen below, it seems that the query plan has changed
at some point during optimization.
{code:java}
== Analyzed Logical Plan ==
count(1): bigint
WithCTE
:- CTERelationDef 2, false
: +- SubqueryAlias c
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Project [c_customer_sk#0, c_customer_id#1, c_current_cdemo_sk#2,
c_current_hdemo_sk#3, c_current_addr_sk#4, c_first_shipto_date_sk#5,
c_first_sales_date_sk#6, c_salutation#7, c_first_name#8, c_last_name#9,
c_preferred_cust_flag#10, c_birth_day#11L, c_birth_month#12L, c_birth_year#13L,
c_birth_country#14, c_login#15, c_email_address#16, c_last_review_date_sk#17]
: +- SubqueryAlias customer
: +- View (`customer`, [c_customer_sk#0, c_customer_id#1,
c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
c_email_address#16, c_last_review_date_sk#17])
: +- Relation
[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
parquet
+- Aggregate [count(1) AS count(1)#194L]
+- Filter (c_customer_sk#0 > c_customer_sk#176)
+- Join Inner
:- SubqueryAlias c1
: +- SubqueryAlias c
: +- CTERelationRef 2, true, [c_customer_sk#0, c_customer_id#1,
c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
c_email_address#16, c_last_review_date_sk#17], false
+- SubqueryAlias c2
+- SubqueryAlias c
+- CTERelationRef 2, true, [c_customer_sk#176,
c_customer_id#177, c_current_cdemo_sk#178, c_current_hdemo_sk#179,
c_current_addr_sk#180, c_first_shipto_date_sk#181, c_first_sales_date_sk#182,
c_salutation#183, c_first_name#184, c_last_name#185, c_preferred_cust_flag#186,
c_birth_day#187L, c_birth_month#188L, c_birth_year#189L, c_birth_country#190,
c_login#191, c_email_address#192, c_last_review_date_sk#193], false== Optimized
Logical Plan ==
Aggregate [count(1) AS count(1)#194L]
+- Project
+- Join Inner, (c_customer_sk#0 > c_customer_sk#214)
:- Filter isnotnull(c_customer_sk#0)
: +- GlobalLimit 10
: +- LocalLimit 10
: +- Project [c_customer_sk#0]
: +- Relation
[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
parquet
+- Filter isnotnull(c_customer_sk#214)
+- GlobalLimit 10
+- LocalLimit 10
+- Project [c_customer_sk#214]
+- Relation
[c_customer_sk#214,c_customer_id#215,c_current_cdemo_sk#216,c_current_hdemo_sk#217,c_current_addr_sk#218,c_first_shipto_date_sk#219,c_first_sales_date_sk#220,c_salutation#221,c_first_name#222,c_last_name#223,c_preferred_cust_flag#224,c_birth_day#225L,c_birth_month#226L,c_birth_year#227L,c_birth_country#228,c_login#229,c_email_address#230,c_last_review_date_sk#231]
parquet {code}
> Self join of a CTE seems non-deterministic
> ------------------------------------------
>
> Key: SPARK-49030
> URL: https://issues.apache.org/jira/browse/SPARK-49030
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.0.0
> Environment: Tested with Spark 3.4.1, 3.5.1, and 4.0.0-preview.
> Reporter: Jihoon Son
> Priority: Minor
>
> {code:java}
> WITH c AS (SELECT * FROM customer LIMIT 10)
> SELECT count(*)
> FROM c c1, c c2
> WHERE c1.c_customer_sk > c2.c_customer_sk{code}
> Suppose a self join query on a CTE such as the one above.
> Spark generates a physical plan like the one below for this query.
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#194L])
> +- HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#233L])
> +- Project
> +- BroadcastNestedLoopJoin BuildRight, Inner, (c_customer_sk#0 >
> c_customer_sk#214)
> :- Filter isnotnull(c_customer_sk#0)
> : +- GlobalLimit 10, 0
> : +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
> [plan_id=256]
> : +- LocalLimit 10
> : +- FileScan parquet [c_customer_sk#0] Batched: true,
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
> ReadSchema: struct<c_customer_sk:int>
> +- BroadcastExchange IdentityBroadcastMode, [plan_id=263]
> +- Filter isnotnull(c_customer_sk#214)
> +- GlobalLimit 10, 0
> +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
> [plan_id=259]
> +- LocalLimit 10
> +- FileScan parquet [c_customer_sk#214] Batched:
> true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
> ReadSchema: struct<c_customer_sk:int>{code}
> Evaluating this plan produces non-deterministic result because the limit is
> independently pushed into the two sides of the join. Each limit can produce
> different data, and thus the join can produce results that vary across runs.
> I understand that the query in question is not deterministic (and thus not
> very practical) as, due to the nature of the limit in distributed engines, it
> is not expected to produce the same result anyway across repeated runs.
> However, I would still expect that the query plan evaluation remains
> deterministic.
> Per extended analysis as seen below, it seems that the query plan has changed
> at some point during optimization.
> {code:java}
> == Analyzed Logical Plan ==
> count(1): bigint
> WithCTE
> :- CTERelationDef 2, false
> : +- SubqueryAlias c
> : +- GlobalLimit 10
> : +- LocalLimit 10
> : +- Project [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17]
> : +- SubqueryAlias customer
> : +- View (`customer`, [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17])
> : +- Relation
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
> parquet
> +- Aggregate [count(1) AS count(1)#194L]
> +- Filter (c_customer_sk#0 > c_customer_sk#176)
> +- Join Inner
> :- SubqueryAlias c1
> : +- SubqueryAlias c
> : +- CTERelationRef 2, true, [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17], false
> +- SubqueryAlias c2
> +- SubqueryAlias c
> +- CTERelationRef 2, true, [c_customer_sk#176,
> c_customer_id#177, c_current_cdemo_sk#178, c_current_hdemo_sk#179,
> c_current_addr_sk#180, c_first_shipto_date_sk#181, c_first_sales_date_sk#182,
> c_salutation#183, c_first_name#184, c_last_name#185,
> c_preferred_cust_flag#186, c_birth_day#187L, c_birth_month#188L,
> c_birth_year#189L, c_birth_country#190, c_login#191, c_email_address#192,
> c_last_review_date_sk#193], false
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count(1)#194L]
> +- Project
> +- Join Inner, (c_customer_sk#0 > c_customer_sk#214)
> :- Filter isnotnull(c_customer_sk#0)
> : +- GlobalLimit 10
> : +- LocalLimit 10
> : +- Project [c_customer_sk#0]
> : +- Relation
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
> parquet
> +- Filter isnotnull(c_customer_sk#214)
> +- GlobalLimit 10
> +- LocalLimit 10
> +- Project [c_customer_sk#214]
> +- Relation
> [c_customer_sk#214,c_customer_id#215,c_current_cdemo_sk#216,c_current_hdemo_sk#217,c_current_addr_sk#218,c_first_shipto_date_sk#219,c_first_sales_date_sk#220,c_salutation#221,c_first_name#222,c_last_name#223,c_preferred_cust_flag#224,c_birth_day#225L,c_birth_month#226L,c_birth_year#227L,c_birth_country#228,c_login#229,c_email_address#230,c_last_review_date_sk#231]
> parquet {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]