[ 
https://issues.apache.org/jira/browse/SPARK-47520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

William Montaz updated SPARK-47520:
-----------------------------------
    Description: 
We discovered an important correctness issue directly linked to SPARK-47024

Even if SPARK-47024 has been considered 'Not a Problem' since it is linked 
directly to floats and double rounding, it can still have drastic impacts 
combined to spark.sql.execution.sortBeforeRepartition set to true (the default)

We consistently reproduced the issue doing a GROUP BY with a SUM of float or 
double aggregation, followed by a repartition (common case to produce bigger 
files as output, either triggered by SQL hints or extensions like kyuubi). 

If the repartition stage fails with Fetch Failed Exception for only few tasks, 
spark decides to recompute the partitions from the previous stage for which 
output could not be fetched and will retry only the failed partitions 
downstream.

Because block fetch order is indeterministic, the new upstream partition 
computation can provide a slightly different value for a float/double sum 
aggregation. We noticed a 1 bit difference is UnsafeRow backing byte array in 
all of our attempts. The sort performed before repartition uses 
UnsafeRow.hashcode for the row prefix which will be completely different even 
with such 1 bit difference, leading to the sort being completely different in 
the new upstream partition and thus target partition for the shuffled rows 
completely different as well.

Because sort becomes undeterministic and since only the failed dowstream tasks 
are retried the resulting repartition will lead to duplicate rows as well as 
missing rows. The solution brought by SPARK-23207 is broken.

So far, we can only deactivate spark.sql.execution.sortBeforeRepartition to 
make the entire job fail.

  was:
We discovered an important correctness issue directly linked to SPARK-47024

Even if SPARK-47024 has been considered 'Not a Problem' since it is linked 
directly to floats and double rounding, it can still have drastic impacts 
combined to spark.sql.execution.sortBeforeRepartition set to true (the default)

We consistently reproduced the issue doing a GROUP BY with a SUM of float or 
double aggreagtion, followed by a repartition (common case to produce bigger 
files as output, either triggered by SQL hints or extensions like kyuubi). 

If the repartition stage fails with Fetch Failed Exception for only few tasks, 
spark decides to recompute the partitions from the previous stage for which 
output could not be fetched and will retry only the failed partitions 
downstream.

Because block fetch order is indeterministic, the new before-shuffle partition 
computation can provide a slightly different value for a float/double sum 
aggregation. We noticed a 1 bit difference in all of our attempts. The sort 
performed before repartition uses UnsafeRow.hashcode for the row prefix which 
will be completely different even with such 1 bit difference, leading to the 
sort being completely different in the new before-shuffle partition and thus 
destination partition for the shuffled rows completely different as well.

Because sort becomes undeterministic and since only the failed dowstream tasks 
are retried the resulting repartition will lead to duplicate rows as well as 
missing rows. The solution brought by SPARK-23207 is broken.

So far, we can only deactivate spark.sql.execution.sortBeforeRepartition to 
make the entire job fail.


> Rounding issues with sum of floats/doubles leads to incorrect data after 
> repartition
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-47520
>                 URL: https://issues.apache.org/jira/browse/SPARK-47520
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.2, 3.3.2, 3.5.0
>            Reporter: William Montaz
>            Priority: Major
>              Labels: correctness
>
> We discovered an important correctness issue directly linked to SPARK-47024
> Even if SPARK-47024 has been considered 'Not a Problem' since it is linked 
> directly to floats and double rounding, it can still have drastic impacts 
> combined to spark.sql.execution.sortBeforeRepartition set to true (the 
> default)
> We consistently reproduced the issue doing a GROUP BY with a SUM of float or 
> double aggregation, followed by a repartition (common case to produce bigger 
> files as output, either triggered by SQL hints or extensions like kyuubi). 
> If the repartition stage fails with Fetch Failed Exception for only few 
> tasks, spark decides to recompute the partitions from the previous stage for 
> which output could not be fetched and will retry only the failed partitions 
> downstream.
> Because block fetch order is indeterministic, the new upstream partition 
> computation can provide a slightly different value for a float/double sum 
> aggregation. We noticed a 1 bit difference is UnsafeRow backing byte array in 
> all of our attempts. The sort performed before repartition uses 
> UnsafeRow.hashcode for the row prefix which will be completely different even 
> with such 1 bit difference, leading to the sort being completely different in 
> the new upstream partition and thus target partition for the shuffled rows 
> completely different as well.
> Because sort becomes undeterministic and since only the failed dowstream 
> tasks are retried the resulting repartition will lead to duplicate rows as 
> well as missing rows. The solution brought by SPARK-23207 is broken.
> So far, we can only deactivate spark.sql.execution.sortBeforeRepartition to 
> make the entire job fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to