[ 
https://issues.apache.org/jira/browse/SPARK-49030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haejoon Lee updated SPARK-49030:
--------------------------------
    Fix Version/s: 3.5.4
                       (was: 3.5.3)

> Self join of a CTE seems non-deterministic
> ------------------------------------------
>
>                 Key: SPARK-49030
>                 URL: https://issues.apache.org/jira/browse/SPARK-49030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>         Environment: Tested with Spark 3.4.1, 3.5.1, and 4.0.0-preview.
>            Reporter: Jihoon Son
>            Priority: Minor
>             Fix For: 3.5.4
>
>         Attachments: screenshot-1.png
>
>
> {code:java}
> WITH c AS (SELECT * FROM customer LIMIT 10)
> SELECT count(*)
> FROM c c1, c c2
> WHERE c1.c_customer_sk > c2.c_customer_sk{code}
> Suppose a self join query on a CTE such as the one above.
> Spark generates a physical plan like the one below for this query.
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#194L])
>    +- HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#233L])
>       +- Project
>          +- BroadcastNestedLoopJoin BuildRight, Inner, (c_customer_sk#0 > 
> c_customer_sk#214)
>             :- Filter isnotnull(c_customer_sk#0)
>             :  +- GlobalLimit 10, 0
>             :     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
> [plan_id=256]
>             :        +- LocalLimit 10
>             :           +- FileScan parquet [c_customer_sk#0] Batched: true, 
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<c_customer_sk:int>
>             +- BroadcastExchange IdentityBroadcastMode, [plan_id=263]
>                +- Filter isnotnull(c_customer_sk#214)
>                   +- GlobalLimit 10, 0
>                      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
> [plan_id=259]
>                         +- LocalLimit 10
>                            +- FileScan parquet [c_customer_sk#214] Batched: 
> true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
> paths)[file:/some/path/customer], PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<c_customer_sk:int>{code}
> Evaluating this plan produces non-deterministic result because the limit is 
> independently pushed into the two sides of the join. Each limit can produce 
> different data, and thus the join can produce results that vary across runs.
> I understand that the query in question is not deterministic (and thus not 
> very practical) as, due to the nature of the limit in distributed engines, it 
> is not expected to produce the same result anyway across repeated runs. 
> However, I would still expect that the query plan evaluation remains 
> deterministic.
> Per extended analysis as seen below, it seems that the query plan has changed 
> at some point during optimization.
> {code:java}
> == Analyzed Logical Plan ==
> count(1): bigint
> WithCTE
> :- CTERelationDef 2, false
> :  +- SubqueryAlias c
> :     +- GlobalLimit 10
> :        +- LocalLimit 10
> :           +- Project [c_customer_sk#0, c_customer_id#1, 
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4, 
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7, 
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L, 
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15, 
> c_email_address#16, c_last_review_date_sk#17]
> :              +- SubqueryAlias customer
> :                 +- View (`customer`, [c_customer_sk#0, c_customer_id#1, 
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4, 
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7, 
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L, 
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15, 
> c_email_address#16, c_last_review_date_sk#17])
> :                    +- Relation 
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
>  parquet
> +- Aggregate [count(1) AS count(1)#194L]
>    +- Filter (c_customer_sk#0 > c_customer_sk#176)
>       +- Join Inner
>          :- SubqueryAlias c1
>          :  +- SubqueryAlias c
>          :     +- CTERelationRef 2, true, [c_customer_sk#0, c_customer_id#1, 
> c_current_cdemo_sk#2, c_current_hdemo_sk#3, c_current_addr_sk#4, 
> c_first_shipto_date_sk#5, c_first_sales_date_sk#6, c_salutation#7, 
> c_first_name#8, c_last_name#9, c_preferred_cust_flag#10, c_birth_day#11L, 
> c_birth_month#12L, c_birth_year#13L, c_birth_country#14, c_login#15, 
> c_email_address#16, c_last_review_date_sk#17], false
>          +- SubqueryAlias c2
>             +- SubqueryAlias c
>                +- CTERelationRef 2, true, [c_customer_sk#176, 
> c_customer_id#177, c_current_cdemo_sk#178, c_current_hdemo_sk#179, 
> c_current_addr_sk#180, c_first_shipto_date_sk#181, c_first_sales_date_sk#182, 
> c_salutation#183, c_first_name#184, c_last_name#185, 
> c_preferred_cust_flag#186, c_birth_day#187L, c_birth_month#188L, 
> c_birth_year#189L, c_birth_country#190, c_login#191, c_email_address#192, 
> c_last_review_date_sk#193], false
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count(1)#194L]
> +- Project
>    +- Join Inner, (c_customer_sk#0 > c_customer_sk#214)
>       :- Filter isnotnull(c_customer_sk#0)
>       :  +- GlobalLimit 10
>       :     +- LocalLimit 10
>       :        +- Project [c_customer_sk#0]
>       :           +- Relation 
> [c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4,c_first_shipto_date_sk#5,c_first_sales_date_sk#6,c_salutation#7,c_first_name#8,c_last_name#9,c_preferred_cust_flag#10,c_birth_day#11L,c_birth_month#12L,c_birth_year#13L,c_birth_country#14,c_login#15,c_email_address#16,c_last_review_date_sk#17]
>  parquet
>       +- Filter isnotnull(c_customer_sk#214)
>          +- GlobalLimit 10
>             +- LocalLimit 10
>                +- Project [c_customer_sk#214]
>                   +- Relation 
> [c_customer_sk#214,c_customer_id#215,c_current_cdemo_sk#216,c_current_hdemo_sk#217,c_current_addr_sk#218,c_first_shipto_date_sk#219,c_first_sales_date_sk#220,c_salutation#221,c_first_name#222,c_last_name#223,c_preferred_cust_flag#224,c_birth_day#225L,c_birth_month#226L,c_birth_year#227L,c_birth_country#228,c_login#229,c_email_address#230,c_last_review_date_sk#231]
>  parquet {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to