Re: [Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Mich Talebzadeh
Hi,

Quick observations from what you have provided

- The observed discrepancy between rdd.count() and
rdd.map(Item::getType).countByValue()in distributed mode suggests a
potential aggregation issue with countByValue(). The correct results in
local mode give credence to this theory.
- Workarounds using mapToPair() and reduceByKey() produce identical
results, indicating a broader pattern rather than method specific behaviour.
- Dataset.groupBy().count()yields accurate results, but this method incurs
overhead for RDD-to-Dataset conversion.

Your expected total count  of 75187 is around  7 times larger than the
observed count of 10519, mapping to the number of your executors 7. This
suggests potentially incorrect aggregation or partial aggregation across
executors.

Now before raising red flag, these could be the culprit

- Data Skew, uneven distribution of data across executors could cause
partial aggregation if a single executor processes most items of a
particular type.
- Partial Aggregations, Spark might be combining partial counts from
executors incorrectly, leading to inaccuracies.
- Finally a bug in 3.5 is possible.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 27 Feb 2024 at 19:02, Stuart Fehr  wrote:

> Hello, I recently encountered a bug with the results from
> JavaRDD#countByValue that does not reproduce when running locally. For
> background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.
>
> The code in question is something like this:
>
> JavaRDD rdd = // ...
>> rdd.count();  // 75187
>
>
>
> // Get the count broken down by type
>> rdd.map(Item::getType).countByValue();
>
>
> Which gives these results from the resulting Map:
>
> TypeA: 556
> TypeB: 9168
> TypeC: 590
> TypeD: 205
> (total: 10519)
>
> These values are incorrect, since every item has a type defined, so the
> total of all the types should be 75187. When I inspected this stage in the
> Spark UI, I found that it was using 7 executors. Since the value here is
> about 1/7th of the actual expected value, I suspect that there is some
> issue with the way that the executors report their results back to the
> driver. These results for the same code are correct when I run the job in
> local mode ("local[4]"), so it may also have something to do with how data
> is shared across processes.
>
> For workarounds, I have also tried:
>
> rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
>> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
>> 1L)).reduceByKey(Long::sum).collectAsMap();
>
>
> These yielded the same (incorrect) result.
>
> I did find that using Dataset.groupBy().count() did yield the correct
> results:
>
> TypeA: 3996
> TypeB: 65490
> TypeC: 4224
> TypeD: 1477
>
> So, I have an immediate workaround, but it is somewhat awkward since I
> have to create a Dataframe from a JavaRDD each time.
>
> Am I doing something wrong? Do these methods not work the way that I
> expected them to from reading the documentation? Is this a legitimate bug?
>
> I would be happy to provide more details if that would help in debugging
> this scenario.
>
> Thank you for your time,
> ~Stuart Fehr
>


[Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Stuart Fehr
Hello, I recently encountered a bug with the results from
JavaRDD#countByValue that does not reproduce when running locally. For
background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.

The code in question is something like this:

JavaRDD rdd = // ...
> rdd.count();  // 75187



// Get the count broken down by type
> rdd.map(Item::getType).countByValue();


Which gives these results from the resulting Map:

TypeA: 556
TypeB: 9168
TypeC: 590
TypeD: 205
(total: 10519)

These values are incorrect, since every item has a type defined, so the
total of all the types should be 75187. When I inspected this stage in the
Spark UI, I found that it was using 7 executors. Since the value here is
about 1/7th of the actual expected value, I suspect that there is some
issue with the way that the executors report their results back to the
driver. These results for the same code are correct when I run the job in
local mode ("local[4]"), so it may also have something to do with how data
is shared across processes.

For workarounds, I have also tried:

rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
> 1L)).reduceByKey(Long::sum).collectAsMap();


These yielded the same (incorrect) result.

I did find that using Dataset.groupBy().count() did yield the correct
results:

TypeA: 3996
TypeB: 65490
TypeC: 4224
TypeD: 1477

So, I have an immediate workaround, but it is somewhat awkward since I have
to create a Dataframe from a JavaRDD each time.

Am I doing something wrong? Do these methods not work the way that I
expected them to from reading the documentation? Is this a legitimate bug?

I would be happy to provide more details if that would help in debugging
this scenario.

Thank you for your time,
~Stuart Fehr


Re: Issue of spark with antlr version

2024-02-27 Thread Bjørn Jørgensen
[SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1



tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
:

> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Issue of spark with antlr version

2024-02-27 Thread Mich Talebzadeh
Hi,

You have provided little information about where Spark fits in here. So I
am guessing :)

Data Source (JSON, XML, log file, etc.) -->
Preprocessing (Spark jobs for filtering, cleaning, etc.)? -->
Antlr Parser (Generated tool) -->
Extracted Data (Mapped to model) -->
Spring Data Model (Java objects) -->
Spring Application Logic (Controllers, Services, Repositories)

etc. Is this a good guess?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 27 Feb 2024 at 12:25, Sahni, Ashima
 wrote:

> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


RE: Issue of spark with antlr version

2024-02-27 Thread Sahni, Ashima
Hi Team,

Can you please let us know the update on below.

Thanks,
Ashima

From: Chawla, Parul 
Sent: Sunday, February 25, 2024 11:57 PM
To: user@spark.apache.org
Cc: Sahni, Ashima ; Misra Parashar, Jyoti 

Subject: Issue of spark with antlr version

Hi Spark Team,


Our application is currently using spring framrwork 5.3.31 .To upgrade it to 
6.x , as per application dependency we must upgrade Spark and Hibernate jars as 
well .
With Hibernate compatible upgrade, the dependent Antlr4 jar version has been 
upgraded to 4.10.1 but there's no Spark version available with the upgraded 
Antlr4 jar.
Can u please update when we can have updated version with upgraded antl4 
version..


Regards,
Parul



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security, AI-powered support capabilities, and assessment of 
internal compliance with Accenture policy. Your privacy is important to us. 
Accenture uses your personal data only in compliance with data protection laws. 
For further information on how Accenture processes your personal data, please 
see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


Unsubscribe

2024-02-27 Thread benson fang
Unsubscribe

Regards


Re: Bugs with joins and SQL in Structured Streaming

2024-02-27 Thread Andrzej Zera
Hi,

Yes, I tested all of them on spark 3.5.

Regards,
Andrzej


pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
napisał(a):

> Hi,
>
> These are all on spark 3.5, correct?
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:
>
>> Hey all,
>>
>> I've been using Structured Streaming in production for almost a year
>> already and I want to share the bugs I found in this time. I created a test
>> for each of the issues and put them all here:
>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>
>> I split the issues into three groups: outer joins on event time, interval
>> joins and Spark SQL.
>>
>> Issues related to outer joins:
>>
>>- When joining three or more input streams on event time, if two or
>>more streams don't contain an event for a join key (which is event time),
>>no row will be output even if other streams contain an event for this join
>>key. Tests that check for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>and
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>- When joining aggregated stream with raw events with a stream with
>>already aggregated events (aggregation made outside of Spark), then no row
>>will be output if that second stream don't contain a corresponding event.
>>Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>- When joining two aggregated streams (aggregated in Spark), no
>>result is produced. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>I've already reported this one here:
>>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>>handled yet.
>>
>> Issues related to interval joins:
>>
>>- When joining three streams (A, B, C) using interval join on event
>>time, in the way that B.eventTime is conditioned on A.eventTime and
>>C.eventTime is also conditioned on A.eventTime, and then doing window
>>aggregation based on A's event time, the result is output only after
>>watermark crosses the window end + interval(A, B) + interval (A, C).
>>However, I'd expect results to be output faster, i.e. when the watermark
>>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
>>that event B can happen 3 minutes after event A and event C can happen 5
>>minutes after A, there is no point to suspend reporting output for 8
>>minutes (3+5) after the end of the window if we know that no more event 
>> can
>>be matched after 5 min from the window end (assuming window end is based 
>> on
>>A's event time). Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>
>> SQL issues:
>>
>>- WITH clause (in contrast to subquery) seems to create a static
>>DataFrame that can't be used in streaming joins. Test that checks for 
>> this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>- Two subqueries, each aggregating data using window() functio,
>>breaks the output schema. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>
>> I'm a beginner with Scala (I'm using Structured Streaming with PySpark)
>> so won't be able to provide fixes. But I hope the test cases I provided can
>> be of some help.
>>
>> Regards,
>> Andrzej
>>
>