Re: [Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Mich Talebzadeh
Hi,

Quick observations from what you have provided

- The observed discrepancy between rdd.count() and
rdd.map(Item::getType).countByValue()in distributed mode suggests a
potential aggregation issue with countByValue(). The correct results in
local mode give credence to this theory.
- Workarounds using mapToPair() and reduceByKey() produce identical
results, indicating a broader pattern rather than method specific behaviour.
- Dataset.groupBy().count()yields accurate results, but this method incurs
overhead for RDD-to-Dataset conversion.

Your expected total count  of 75187 is around  7 times larger than the
observed count of 10519, mapping to the number of your executors 7. This
suggests potentially incorrect aggregation or partial aggregation across
executors.

Now before raising red flag, these could be the culprit

- Data Skew, uneven distribution of data across executors could cause
partial aggregation if a single executor processes most items of a
particular type.
- Partial Aggregations, Spark might be combining partial counts from
executors incorrectly, leading to inaccuracies.
- Finally a bug in 3.5 is possible.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 27 Feb 2024 at 19:02, Stuart Fehr  wrote:

> Hello, I recently encountered a bug with the results from
> JavaRDD#countByValue that does not reproduce when running locally. For
> background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.
>
> The code in question is something like this:
>
> JavaRDD rdd = // ...
>> rdd.count();  // 75187
>
>
>
> // Get the count broken down by type
>> rdd.map(Item::getType).countByValue();
>
>
> Which gives these results from the resulting Map:
>
> TypeA: 556
> TypeB: 9168
> TypeC: 590
> TypeD: 205
> (total: 10519)
>
> These values are incorrect, since every item has a type defined, so the
> total of all the types should be 75187. When I inspected this stage in the
> Spark UI, I found that it was using 7 executors. Since the value here is
> about 1/7th of the actual expected value, I suspect that there is some
> issue with the way that the executors report their results back to the
> driver. These results for the same code are correct when I run the job in
> local mode ("local[4]"), so it may also have something to do with how data
> is shared across processes.
>
> For workarounds, I have also tried:
>
> rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
>> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
>> 1L)).reduceByKey(Long::sum).collectAsMap();
>
>
> These yielded the same (incorrect) result.
>
> I did find that using Dataset.groupBy().count() did yield the correct
> results:
>
> TypeA: 3996
> TypeB: 65490
> TypeC: 4224
> TypeD: 1477
>
> So, I have an immediate workaround, but it is somewhat awkward since I
> have to create a Dataframe from a JavaRDD each time.
>
> Am I doing something wrong? Do these methods not work the way that I
> expected them to from reading the documentation? Is this a legitimate bug?
>
> I would be happy to provide more details if that would help in debugging
> this scenario.
>
> Thank you for your time,
> ~Stuart Fehr
>


[Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Stuart Fehr
Hello, I recently encountered a bug with the results from
JavaRDD#countByValue that does not reproduce when running locally. For
background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.

The code in question is something like this:

JavaRDD rdd = // ...
> rdd.count();  // 75187



// Get the count broken down by type
> rdd.map(Item::getType).countByValue();


Which gives these results from the resulting Map:

TypeA: 556
TypeB: 9168
TypeC: 590
TypeD: 205
(total: 10519)

These values are incorrect, since every item has a type defined, so the
total of all the types should be 75187. When I inspected this stage in the
Spark UI, I found that it was using 7 executors. Since the value here is
about 1/7th of the actual expected value, I suspect that there is some
issue with the way that the executors report their results back to the
driver. These results for the same code are correct when I run the job in
local mode ("local[4]"), so it may also have something to do with how data
is shared across processes.

For workarounds, I have also tried:

rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
> 1L)).reduceByKey(Long::sum).collectAsMap();


These yielded the same (incorrect) result.

I did find that using Dataset.groupBy().count() did yield the correct
results:

TypeA: 3996
TypeB: 65490
TypeC: 4224
TypeD: 1477

So, I have an immediate workaround, but it is somewhat awkward since I have
to create a Dataframe from a JavaRDD each time.

Am I doing something wrong? Do these methods not work the way that I
expected them to from reading the documentation? Is this a legitimate bug?

I would be happy to provide more details if that would help in debugging
this scenario.

Thank you for your time,
~Stuart Fehr