[
https://issues.apache.org/jira/browse/SPARK-49030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Haejoon Lee updated SPARK-49030:
--------------------------------
Fix Version/s: 3.5.4
(was: 3.5.3)
> Self join of a CTE seems non-deterministic
> ------------------------------------------
>
> Key: SPARK-49030
> URL: https://issues.apache.org/jira/browse/SPARK-49030
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.0.0
> Environment: Tested with Spark 3.4.1, 3.5.1, and 4.0.0-preview.
> Reporter: Jihoon Son
> Priority: Minor
> Fix For: 3.5.4
>
> Attachments: screenshot-1.png
>
>
> {code:java}
> WITH c AS (SELECT * FROM customer LIMIT 10)
> SELECT count(*)
> FROM c c1, c c2
> WHERE c1.c_customer_sk > c2.c_customer_sk{code}
> Suppose a self join query on a CTE such as the one above.
> Spark generates a physical plan like the one below for this query.
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#194L])
> +- HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#233L])
> +- Project
> +- BroadcastNestedLoopJoin BuildRight, Inner, (c_customer_sk#0 >
> c_customer_sk#214)
> :- Filter isnotnull(c_customer_sk#0)
> : +- GlobalLimit 10, 0
> : +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
> [plan_id=256]
> : +- LocalLimit 10
> : +- FileScan parquet [c_customer_sk#0] Batched: true,
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
> ReadSchema: struct<c_customer_sk:int>
> +- BroadcastExchange IdentityBroadcastMode, [plan_id=263]
> +- Filter isnotnull(c_customer_sk#214)
> +- GlobalLimit 10, 0
> +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
> [plan_id=259]
> +- LocalLimit 10
> +- FileScan parquet [c_customer_sk#214] Batched:
> true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [],
> ReadSchema: struct<c_customer_sk:int>{code}
> Evaluating this plan produces non-deterministic result because the limit is
> independently pushed into the two sides of the join. Each limit can produce
> different data, and thus the join can produce results that vary across runs.
> I understand that the query in question is not deterministic (and thus not
> very practical) as, due to the nature of the limit in distributed engines, it
> is not expected to produce the same result anyway across repeated runs.
> However, I would still expect that the query plan evaluation remains
> deterministic.
> Per extended analysis as seen below, it seems that the query plan has changed
> at some point during optimization.
> {code:java}
> == Analyzed Logical Plan ==
> count(1): bigint
> WithCTE
> :- CTERelationDef 2, false
> : +- SubqueryAlias c
> : +- GlobalLimit 10
> : +- LocalLimit 10
> : +- Project [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17]
> : +- SubqueryAlias customer
> : +- View (`customer`, [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17])
> : +- Relation
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
> parquet
> +- Aggregate [count(1) AS count(1)#194L]
> +- Filter (c_customer_sk#0 > c_customer_sk#176)
> +- Join Inner
> :- SubqueryAlias c1
> : +- SubqueryAlias c
> : +- CTERelationRef 2, true, [c_customer_sk#0, c_customer_id#1,
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4,
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7,
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L,
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15,
> c_email_address#16, c_last_review_date_sk#17], false
> +- SubqueryAlias c2
> +- SubqueryAlias c
> +- CTERelationRef 2, true, [c_customer_sk#176,
> c_customer_id#177, c_current_cdemo_sk#178, c_current_hdemo_sk#179,
> c_current_addr_sk#180, c_first_shipto_date_sk#181, c_first_sales_date_sk#182,
> c_salutation#183, c_first_name#184, c_last_name#185,
> c_preferred_cust_flag#186, c_birth_day#187L, c_birth_month#188L,
> c_birth_year#189L, c_birth_country#190, c_login#191, c_email_address#192,
> c_last_review_date_sk#193], false
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count(1)#194L]
> +- Project
> +- Join Inner, (c_customer_sk#0 > c_customer_sk#214)
> :- Filter isnotnull(c_customer_sk#0)
> : +- GlobalLimit 10
> : +- LocalLimit 10
> : +- Project [c_customer_sk#0]
> : +- Relation
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
> parquet
> +- Filter isnotnull(c_customer_sk#214)
> +- GlobalLimit 10
> +- LocalLimit 10
> +- Project [c_customer_sk#214]
> +- Relation
> [c_customer_sk#214,c_customer_id#215,c_current_cdemo_sk#216,c_current_hdemo_sk#217,c_current_addr_sk#218,c_first_shipto_date_sk#219,c_first_sales_date_sk#220,c_salutation#221,c_first_name#222,c_last_name#223,c_preferred_cust_flag#224,c_birth_day#225L,c_birth_month#226L,c_birth_year#227L,c_birth_country#228,c_login#229,c_email_address#230,c_last_review_date_sk#231]
> parquet {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]