Hi, The crux of the matter here as I understand is " how should I be using Executor Rolling, without triggering stage failures?"
The object of executor rolling is to replace decommissioning executors with new ones while minimizing the impact on running tasks and stages. in k8s. As mentioned spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin" spark.kubernetes.executor.rollInterval: "1800s" spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK" spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100" You will need to ensure that the decommissioning of executors is done gracefully. |As per classic Spark, data and tasks being handled by a decommissioned executor should be properly redistributed to active executors before the decommissioned executor is removed, otherwise you are going to have issues. Need to have an eye on fetch failures during rolling. *This can happen if tasks attempt to fetch data from decommissioned executors before the data is redistributed.* Possible remedy would be to set "spark.stage.ignoreDecommissionFetchFailure'', "true" (as you have correctly pointed out) to tell Spark to ignore fetch failures from decommissioned executors and retry the tasks on the remaining active executors as per norm. This will incur additional computation as expected but will ensure data integrity In general other parameters settings such as "spark.kubernetes.executor.minTasksPerExecutorBeforeRolling" need to be tried for your workload and it is practically impossible to guess for optimum values. This parameter controls the minimum number of tasks that should be completed before an executor is rolled. HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 25 Aug 2023 at 17:48, Arun Ravi <arunrav...@gmail.com> wrote: > Hi Team, > I am running Apache Spark 3.4.1 Application on K8s with the below > configuration related to executor rolling and Ignore Decommission Fetch > Failure. > > spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin" > spark.kubernetes.executor.rollInterval: "1800s" > spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK" > spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100" > > spark.stage.ignoreDecommissionFetchFailure: "true" > spark.scheduler.maxRetainedRemovedDecommissionExecutors: "20" > > spark.decommission.enabled: "true" > spark.storage.decommission.enabled: "true" > spark.storage.decommission.fallbackStorage.path: "some-s3-path" > spark.storage.decommission.shuffleBlocks.maxThreads: "16" > > When an executor is decommissioned in the middle of the stage, I notice > that there are shuffle fetch failures in tasks and the above ignore > decommission configurations are not respected. The stage will go into > retry. The decommissioned executor logs clearly show the decommission was > fully graceful and blocks were replicated to other active > executors/fallback. > > May I know how I should be using Executor Rolling, without triggering > stage failures? I am using executor rolling to avoid executors being > removed by K8s due to memory pressure or oom issues as my spark job is > heavy on shuffling and has a lot of window functions. Any help will be > super useful. > > > > Arun Ravi M V > B.Tech (Batch: 2010-2014) > > Computer Science and Engineering > > Govt. Model Engineering College > Cochin University Of Science And Technology > Kochi >