Hi Team, Thank you for clarifying about decommission ignore fetch failure behavior. Previously I was using Executor Rolling and Decommision and Ignore Decommission Fetch Failure as a solution for all the problems. I understand that Executor rolling must be carefully tuned to minimize fetch failures along with the best effort failure ignore feature. Sorry, I have I have two more follow-up questions.
- When is the block address broadcasted from the master? Is it at the beginning of a shuffle fetch stage and/or would it be refreshed before each shuffle fetch task? - It would be great if you could point me to this logic in the codebase so that I can read and understand it better. - If it gets refreshed before each task, would using the 'excludeOnFailure' feature (spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor) give better reliability? Thank you once again. Arun Ravi M V B.Tech (Batch: 2010-2014) Computer Science and Engineering Govt. Model Engineering College Cochin University Of Science And Technology Kochi On Sat, 26 Aug 2023 at 05:49, Dongjoon Hyun <dongjoon.h...@gmail.com> wrote: > Hi, Arun. > > Here are some answers to your questions. > > First, the fetch failure is irrelevant to the Executor Rolling feature > because the plugin itself only asked the Spark scheduler to decommission > it, not terminate it. More specifically, it's independent from the > underlying Decommissioning feature's behavior. FYI, the following is the > code. In other words, it's totally a behavior of the storage > decommissioning feature and `spark.stage.ignoreDecommissionFetchFailure` > configuration. > > > https://github.com/apache/spark/blob/12f3c81c26ef639842b8a155e5fd5ccfa7705bea/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala#L84 > > Second, for the following your comment, > `spark.stage.ignoreDecommissionFetchFailure` is not designed to prevent > FetchFailure. As you see in the documentation, it tries to ignore stage > fetch failure caused by executor decommission during counting > spark.stage.maxConsecutiveAttempts. Here is SPARK-40481 PR for details. > > > I notice that there are shuffle fetch failures in tasks and the above > ignore decommission > > configurations are not respected. The stage will go into retry. The > decommissioned > > executor logs clearly show the decommission was fully graceful and > blocks were replicated > > to other active executors/fallback. > > https://github.com/apache/spark/pull/37924 > [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned > executor > > Lastly, SPARK-40481 was not designed as a silver bullet from the > beginning. Instead, it was a best effort approach at that time. The > limitation was pointed out during the PR review and the PR description has > the following warning. > > > Fetch failure might not be ignored when executors are in below condition, > > but this is best effort approach based on current mechanism. > > Stopped or terminated after finishing decommission > > Under decommission process, then removed by driver with other reasons > > > Dongjoon. > > > > On Fri, Aug 25, 2023 at 8:21 AM Arun Ravi <arunrav...@gmail.com> wrote: > >> Hi Team, >> I am running Apache Spark 3.4.1 Application on K8s with the below >> configuration related to executor rolling and Ignore Decommission Fetch >> Failure. >> >> spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin" >> spark.kubernetes.executor.rollInterval: "1800s" >> spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK" >> spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100" >> >> spark.stage.ignoreDecommissionFetchFailure: "true" >> spark.scheduler.maxRetainedRemovedDecommissionExecutors: "20" >> >> spark.decommission.enabled: "true" >> spark.storage.decommission.enabled: "true" >> spark.storage.decommission.fallbackStorage.path: "some-s3-path" >> spark.storage.decommission.shuffleBlocks.maxThreads: "16" >> >> When an executor is decommissioned in the middle of the stage, I notice >> that there are shuffle fetch failures in tasks and the above ignore >> decommission configurations are not respected. The stage will go into >> retry. The decommissioned executor logs clearly show the decommission was >> fully graceful and blocks were replicated to other active >> executors/fallback. >> >> May I know how I should be using Executor Rolling, without triggering >> stage failures? I am using executor rolling to avoid executors being >> removed by K8s due to memory pressure or oom issues as my spark job is >> heavy on shuffling and has a lot of window functions. Any help will be >> super useful. >> >> >> >> Arun Ravi M V >> B.Tech (Batch: 2010-2014) >> >> Computer Science and Engineering >> >> Govt. Model Engineering College >> Cochin University Of Science And Technology >> Kochi >> >