Hi Jack,

"....  most SQL engines suffer from the same issue... ""

Sure. This behavior is not a bug, but rather a consequence of the
limitations of floating-point precision. The numbers involved in the
example (see SPIP [SPARK-47024] Sum of floats/doubles may be incorrect
depending on partitioning - ASF JIRA (apache.org)
<https://issues.apache.org/jira/browse/SPARK-47024> exceed the precision of
the double-precision floating-point representation used by default in Spark
and others Interesting to have a look and test the code

This is the code

SUM_EXAMPLE = [
(1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = (
SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() )
def compare_sums(data, num_partitions): df = spark.createDataFrame(data,
"val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0]
df = spark.createDataFrame(data, "val
double").repartition(num_partitions) *result2
= df.agg(sum(col("val"))).collect()[0][0]* assert result1 == result2,
f"{result1}, {result2}" if __name__ == "__main__":
print(compare_sums(SUM_EXAMPLE, 2))
In Python, floating-point numbers are implemented using the IEEE 754
standard,
<https://stackoverflow.com/questions/73340696/how-is-pythons-decimal-and-other-precise-decimal-libraries-implemented-and-wh>which
has a limited precision. When one performs operations with very large
numbers or numbers with many decimal places, one may encounter precision
errors.

print(compare_sums(SUM_EXAMPLE, 2)) File "issue01.py", line 23, in
compare_sums assert result1 == result2, f"{result1}, {result2}"
AssertionError: 9007199254740994.0, 9007199254740992.0
In the aforementioned case, the result of the aggregation (sum) is affected
by the precision limits of floating-point representation. The difference
between 9007199254740994.0, 9007199254740992.0. is within the expected
precision limitations of double-precision floating-point numbers.

The likely cause in this scenario in this example

When one performs an aggregate operation like sum on a DataFrame, the
operation may be affected by the order of the data.and the case here, the
order of data can be influenced by the number of partitions in
Spark..result2 above creates a new DataFrame df with the same data but
explicitly repartition it into two partitions
(repartition(num_partitions)). Repartitioning can shuffle the data across
partitions, introducing a different order for the subsequent aggregation.
The sum operation is then performed on the data in a different order,
leading to a slightly different result from result1

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 03:06, Jack Goodson <jackagood...@gmail.com> wrote:

> I may be ignorant of other debugging methods in Spark but the best success
> I've had is using smaller datasets (if runs take a long time) and adding
> intermediate output steps. This is quite different from application
> development in non-distributed systems where a debugger is trivial to
> attach but I believe it's one of the trade offs on using a system like
> Spark for data processing, most SQL engines suffer from the same issue. If
> you do believe there is a bug in Spark using the explain function like
> Herman mentioned helps as well as looking at the Spark plan in the Spark UI
>
> On Tue, Feb 13, 2024 at 9:24 AM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> OK, I figured it out. The details are in SPARK-47024
>> <https://issues.apache.org/jira/browse/SPARK-47024> for anyone who’s
>> interested.
>>
>> It turned out to be a floating point arithmetic “bug”. The main reason I
>> was able to figure it out was because I’ve been investigating another,
>> unrelated bug (a real bug) related to floats, so these weird float corner
>> cases have been top of mind.
>>
>> If it weren't for that, I wonder how much progress I would have made.
>> Though I could inspect the generated code, I couldn’t figure out how to get
>> logging statements placed in the generated code to print somewhere I could
>> see them.
>>
>> Depending on how often we find ourselves debugging aggregates like this,
>> it would be really helpful if we added some way to trace the aggregation
>> buffer.
>>
>> In any case, mystery solved. Thank you for the pointer!
>>
>>
>> On Feb 12, 2024, at 8:39 AM, Herman van Hovell <her...@databricks.com>
>> wrote:
>>
>> There is no really easy way of getting the state of the aggregation
>> buffer, unless you are willing to modify the code generation and sprinkle
>> in some logging.
>>
>> What I would start with is dumping the generated code by calling
>> explain('codegen') on the DataFrame. That helped me to find similar issues
>> in most cases.
>>
>> HTH
>>
>> On Sun, Feb 11, 2024 at 11:26 PM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Consider this example:
>>>
>>> >>> from pyspark.sql.functions import sum>>> 
>>> >>> spark.range(4).repartition(2).select(sum("id")).show()+-------+|sum(id)|+-------+|
>>> >>>   6    |+-------+
>>>
>>>
>>> I’m trying to understand how this works because I’m investigating a bug
>>> in this kind of aggregate.
>>>
>>> I see that doProduceWithoutKeys
>>> <https://github.com/apache/spark/blob/d02fbba6491fd17dc6bfc1a416971af7544952f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala#L98>
>>>  and doConsumeWithoutKeys
>>> <https://github.com/apache/spark/blob/d02fbba6491fd17dc6bfc1a416971af7544952f3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala#L193>
>>>  are
>>> called, and I believe they are responsible for computing a declarative
>>> aggregate like `sum`. But I’m not sure how I would debug the generated
>>> code, or the inputs that drive what code gets generated.
>>>
>>> Say you were running the above example and it was producing an incorrect
>>> result, and you knew the problem was somehow related to the sum. How would
>>> you troubleshoot it to identify the root cause?
>>>
>>> Ideally, I would like some way to track how the aggregation buffer
>>> mutates as the computation is executed, so I can see something roughly like:
>>>
>>> [0, 1, 2, 3]
>>> [1, 5]
>>> [6]
>>>
>>>
>>> Is there some way to trace a declarative aggregate like this?
>>>
>>> Nick
>>>
>>>
>>

Reply via email to