Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-19 Thread Mich Talebzadeh
de other yarn apps. Implementing the loop outside the
> Spark app will work but introduces more complexity compared to single
> long-lived Spark app with dynamic allocation + min executors. Specifically,
>
>- Introduce component that submits an EMR step to run `spark-submit`
>- Define YARN queue for my app such resources are reserved and other
>tenants will not prevent my app from entering RUNNING state
>- Determine whether the previous YARN app is FINISHED (or just submit
>a bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states)
>
>
>
> So I really was hoping for being able to recreate the Spark Context, or at
> least find some way to trigger a clean of the DiskBlockManager in between
> loop iterations. If no way to do this, I will test performance of cloud
> based shuffle. This might be better for cost savings too (S3 vs. EBS) and
> allow me to use smaller instances too (I was using beefy instances and
> beefy executors to improve shuffle locality).
>
>
>
> To the other points:
>
>1. Dynamic allocation is enabled suspect not the issue here. Enabling
> `spark.shuffle.service.removeShuffle`  didn’t seem to help much – likely
>because executors are not being decommissioned often due to nature of the
>tight loop and the fact executor timeout was already raised from 60s
>default to 300s.
>2. Cloud shuffle + S3 lifecycle policy or brute force/cron removing
>files will for sure work but looking for something more “elegant”
>3. Shuffle data should be deleted after it’s no longer needed  From
>my understanding of the spark codebase the only time the DiskBlockManager
>cleans the `spark.local.dir` directory [1] is when stop() is called – which
>only happens when the SparkEnv is stopped [2].
>4. Suspect spilled data is not what’s filling up disk since app barely
>spills to disk [3]. Also supporting this hypothesis was that raising
>`spark.shuffle.sort.bypassMergeThreshold` to above the max reducer
>partitions significantly slowed the rate of disk usage
>5.
>
> Daniel
>
>
>
> [1]
> https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369
>
> [2]
> https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112
>
> [3] Was able to eliminate most of the skew during repartitionByRange by
> dynamically salting keys using the results of df.stat.countMinSketch
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Sunday, February 18, 2024 at 1:38 AM
> *Cc: *"user@spark.apache.org" 
> *Subject: *RE: [EXTERNAL] Re-create SparkContext of SparkSession inside
> long-lived Spark app
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> What do you propose or you think will help when these spark jobs are
> independent of each other --> So once a job/iterator is complete, there is
> no need to retain these shuffle files. You have a number of options to
> consider starting from spark configuration parameters and so forth
>
>
>
> https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
>
>
>
> Aside, have you turned on dynamic resource allocation and the relevant
> parameters. Can you up executor memory -> spark.storage.,memoryFraction
> and spark.shuffle.spillThreshold as well? You can of course use brute force
> with shutil.rmtree(path) to remove these files.
>
>
>
> HTH
>
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, one verified and tested result holds more weight
> than a thousand expert opinions.
>
>
>
>
>
> On Sat, 17 Feb 2024 at 23:40, Saha, Daniel 
> wrote:
>
> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restartin

Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-19 Thread Saha, Daniel
Thanks for the suggestions Mich, Jörn, and Adam.

The rationale for long-lived app with loop versus submitting multiple yarn 
applications is mainly for simplicity. Plan to run app on an multi-tenant EMR 
cluster alongside other yarn apps. Implementing the loop outside the Spark app 
will work but introduces more complexity compared to single long-lived Spark 
app with dynamic allocation + min executors. Specifically,

  *   Introduce component that submits an EMR step to run `spark-submit`
  *   Define YARN queue for my app such resources are reserved and other 
tenants will not prevent my app from entering RUNNING state
  *   Determine whether the previous YARN app is FINISHED (or just submit a 
bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states)

So I really was hoping for being able to recreate the Spark Context, or at 
least find some way to trigger a clean of the DiskBlockManager in between loop 
iterations. If no way to do this, I will test performance of cloud based 
shuffle. This might be better for cost savings too (S3 vs. EBS) and allow me to 
use smaller instances too (I was using beefy instances and beefy executors to 
improve shuffle locality).

To the other points:

  1.  Dynamic allocation is enabled suspect not the issue here. Enabling  
`spark.shuffle.service.removeShuffle`  didn’t seem to help much – likely 
because executors are not being decommissioned often due to nature of the tight 
loop and the fact executor timeout was already raised from 60s default to 300s.
  2.  Cloud shuffle + S3 lifecycle policy or brute force/cron removing files 
will for sure work but looking for something more “elegant”
  3.  Shuffle data should be deleted after it’s no longer needed • From my 
understanding of the spark codebase the only time the DiskBlockManager cleans 
the `spark.local.dir` directory [1] is when stop() is called – which only 
happens when the SparkEnv is stopped [2].
  4.  Suspect spilled data is not what’s filling up disk since app barely 
spills to disk [3]. Also supporting this hypothesis was that raising 
`spark.shuffle.sort.bypassMergeThreshold` to above the max reducer partitions 
significantly slowed the rate of disk usage
  5.
Daniel

[1] 
https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369
[2] 
https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112
[3] Was able to eliminate most of the skew during repartitionByRange by 
dynamically salting keys using the results of df.stat.countMinSketch


From: Mich Talebzadeh 
Date: Sunday, February 18, 2024 at 1:38 AM
Cc: "user@spark.apache.org" 
Subject: RE: [EXTERNAL] Re-create SparkContext of SparkSession inside 
long-lived Spark app


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

What do you propose or you think will help when these spark jobs are 
independent of each other --> So once a job/iterator is complete, there is no 
need to retain these shuffle files. You have a number of options to consider 
starting from spark configuration parameters and so forth

https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior

Aside, have you turned on dynamic resource allocation and the relevant 
parameters. Can you up executor memory -> spark.storage.,memoryFraction and 
spark.shuffle.spillThreshold as well? You can of course use brute force with 
shutil.rmtree(path) to remove these files.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, one verified and tested result holds more weight than a thousand expert 
opinions.


On Sat, 17 Feb 2024 at 23:40, Saha, Daniel  wrote:
Hi,

Background: I am running into executor disk space issues when running a 
long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back 
spark jobs in a sequential loop with each iteration performing 100gb+ shuffles. 
The files taking up the space are related to shuffle blocks [1]. Disk is only 
cleared when restarting the YARN app. For all intents and purposes, each job is 
independent. So once a job/iterator is complete, there is no need to retain 
these shuffle files. I want to try stopping and recreating the Spark context 
between loop iterations/jobs to indicate to Spark DiskBlockManager that these 
intermediate results a

Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-18 Thread Mich Talebzadeh
Hi,

What do you propose or you think will help when these spark jobs are
independent of each other --> So once a job/iterator is complete, there is
no need to retain these shuffle files. You have a number of options to
consider starting from spark configuration parameters and so forth

https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior

Aside, have you turned on dynamic resource allocation and the relevant
parameters. Can you up executor memory -> spark.storage.,memoryFraction
and spark.shuffle.spillThreshold as well? You can of course use brute force
with shutil.rmtree(path) to remove these files.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, one verified and tested result holds more weight
than a thousand expert opinions.


On Sat, 17 Feb 2024 at 23:40, Saha, Daniel 
wrote:

> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restarting the YARN app. For all
> intents and purposes, each job is independent. So once a job/iterator is
> complete, there is no need to retain these shuffle files. I want to try
> stopping and recreating the Spark context between loop iterations/jobs to
> indicate to Spark DiskBlockManager that these intermediate results are no
> longer needed [2].
>
>
>
> *Questions*:
>
>- Are there better ways to remove/clean the directory containing these
>old, no longer used, shuffle results (aside from cron or restarting yarn
>app)?
>- How to recreate the spark context within a single application? I see
>no methods in Spark Session for doing this, and each new Spark session
>re-uses the existing spark context. After stopping the SparkContext,
>SparkSession does not re-create a new one. Further, creating a new
>SparkSession via constructor and passing in a new SparkContext is not
>allowed as it is a protected/private method.
>
>
>
> Thanks
>
> Daniel
>
>
>
> [1]
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
>
> [2] https://stackoverflow.com/a/38791921
>


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-17 Thread Jörn Franke
You can try to shuffle to s3 using the cloud shuffle plugin for s3 
(https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/)
 - the performance of the new plugin is for many spark jobs sufficient (it 
works also on EMR). Then you can use s3 lifecycle policies to clean up/expire 
objects older than one day 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html)
 - this then also cleans up files from crashed spark jobs.

For shuffle on disk you have not much choices as you mentioned. I would though 
avoid to have a long living app that loops - that never works so well on Spark 
(it is designed for batch jobs that eventually stop). Maybe you can simply 
trigger a new job when a new file arrives (s3 events ?).

> Am 18.02.2024 um 00:39 schrieb Saha, Daniel :
> 
> 
> Hi,
>  
> Background: I am running into executor disk space issues when running a 
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back 
> spark jobs in a sequential loop with each iteration performing 100gb+ 
> shuffles. The files taking up the space are related to shuffle blocks [1]. 
> Disk is only cleared when restarting the YARN app. For all intents and 
> purposes, each job is independent. So once a job/iterator is complete, there 
> is no need to retain these shuffle files. I want to try stopping and 
> recreating the Spark context between loop iterations/jobs to indicate to 
> Spark DiskBlockManager that these intermediate results are no longer needed 
> [2].
>  
> Questions:
> Are there better ways to remove/clean the directory containing these old, no 
> longer used, shuffle results (aside from cron or restarting yarn app)?
> How to recreate the spark context within a single application? I see no 
> methods in Spark Session for doing this, and each new Spark session re-uses 
> the existing spark context. After stopping the SparkContext, SparkSession 
> does not re-create a new one. Further, creating a new SparkSession via 
> constructor and passing in a new SparkContext is not allowed as it is a 
> protected/private method.
>  
> Thanks
> Daniel
>  
> [1] 
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
> [2] https://stackoverflow.com/a/38791921


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-17 Thread Adam Binford
If you're using dynamic allocation it could be caused by executors with
shuffle data being deallocated before the shuffle is cleaned up. These
shuffle files will never get cleaned up once that happens until the Yarn
application ends. This was a big issue for us so I added support for
deleting shuffle data via the shuffle service for deallocated executors
that landed in Spark 3.3, but it is disabled by default. See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L698
.

spark.shuffle.service.removeShuffle

If you're not using dynamic allocation then I'm not sure, shuffle data
should be deleted once it's no longer needed (through garbage collection
mechanisms referencing the shuffle). Maybe just make sure any variables
referencing the first DataFrame go out of scope.

Adam

On Sat, Feb 17, 2024 at 6:40 PM Saha, Daniel 
wrote:

> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restarting the YARN app. For all
> intents and purposes, each job is independent. So once a job/iterator is
> complete, there is no need to retain these shuffle files. I want to try
> stopping and recreating the Spark context between loop iterations/jobs to
> indicate to Spark DiskBlockManager that these intermediate results are no
> longer needed [2].
>
>
>
> *Questions*:
>
>- Are there better ways to remove/clean the directory containing these
>old, no longer used, shuffle results (aside from cron or restarting yarn
>app)?
>- How to recreate the spark context within a single application? I see
>no methods in Spark Session for doing this, and each new Spark session
>re-uses the existing spark context. After stopping the SparkContext,
>SparkSession does not re-create a new one. Further, creating a new
>SparkSession via constructor and passing in a new SparkContext is not
>allowed as it is a protected/private method.
>
>
>
> Thanks
>
> Daniel
>
>
>
> [1]
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
>
> [2] https://stackoverflow.com/a/38791921
>


-- 
Adam Binford