Hi Team,

Thank you for clarifying about decommission ignore fetch failure behavior.
Previously I was using Executor Rolling and Decommision and Ignore
Decommission Fetch Failure as a solution for all the problems. I understand
that Executor rolling must be carefully tuned to minimize fetch failures
along with the best effort failure ignore feature. Sorry, I have I have two
more follow-up questions.

   -  When is the block address broadcasted from the master? Is it at the
   beginning of a shuffle fetch stage and/or would it be refreshed before each
   shuffle fetch task?
      - It would be great if you could point me to this logic in the
      codebase so that I can read and understand it better.
   - If it gets refreshed before each task, would using the
   'excludeOnFailure' feature
   (spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor) give better
   reliability?

Thank you once again.

Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi


On Sat, 26 Aug 2023 at 05:49, Dongjoon Hyun <dongjoon.h...@gmail.com> wrote:

> Hi, Arun.
>
> Here are some answers to your questions.
>
> First, the fetch failure is irrelevant to the Executor Rolling feature
> because the plugin itself only asked the Spark scheduler to decommission
> it, not terminate it. More specifically, it's independent from the
> underlying Decommissioning feature's behavior. FYI, the following is the
> code. In other words, it's totally a behavior of the storage
> decommissioning feature and `spark.stage.ignoreDecommissionFetchFailure`
> configuration.
>
>
> https://github.com/apache/spark/blob/12f3c81c26ef639842b8a155e5fd5ccfa7705bea/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala#L84
>
> Second, for the following your comment,
> `spark.stage.ignoreDecommissionFetchFailure` is not designed to prevent
> FetchFailure. As you see in the documentation, it tries to ignore stage
> fetch failure caused by executor decommission during counting
> spark.stage.maxConsecutiveAttempts. Here is SPARK-40481 PR for details.
>
> > I notice that there are shuffle fetch failures in tasks and the above
> ignore decommission
> > configurations are not respected. The stage will go into retry. The
> decommissioned
> > executor logs clearly show the decommission was fully graceful and
> blocks were replicated
> > to other active executors/fallback.
>
> https://github.com/apache/spark/pull/37924
> [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned
> executor
>
> Lastly, SPARK-40481 was not designed as a silver bullet from the
> beginning. Instead, it was a best effort approach at that time. The
> limitation was pointed out during the PR review and the PR description has
> the following warning.
>
> > Fetch failure might not be ignored when executors are in below condition,
> > but this is best effort approach based on current mechanism.
> > Stopped or terminated after finishing decommission
> > Under decommission process, then removed by driver with other reasons
>
>
> Dongjoon.
>
>
>
> On Fri, Aug 25, 2023 at 8:21 AM Arun Ravi <arunrav...@gmail.com> wrote:
>
>> Hi Team,
>> I am running Apache Spark  3.4.1 Application on K8s with the below
>> configuration related to executor rolling and Ignore Decommission Fetch
>> Failure.
>>
>> spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin"
>> spark.kubernetes.executor.rollInterval: "1800s"
>> spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK"
>> spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100"
>>
>> spark.stage.ignoreDecommissionFetchFailure: "true"
>> spark.scheduler.maxRetainedRemovedDecommissionExecutors: "20"
>>
>> spark.decommission.enabled: "true"
>> spark.storage.decommission.enabled: "true"
>> spark.storage.decommission.fallbackStorage.path: "some-s3-path"
>> spark.storage.decommission.shuffleBlocks.maxThreads: "16"
>>
>> When an executor is decommissioned in the middle of the stage, I notice
>> that there are shuffle fetch failures in tasks and the above ignore
>> decommission configurations are not respected. The stage will go into
>> retry. The decommissioned executor logs clearly show the decommission was
>> fully graceful and blocks were replicated to other active
>> executors/fallback.
>>
>> May I know how I should be using Executor Rolling, without triggering
>> stage failures? I am using executor rolling to avoid executors being
>> removed by K8s due to memory pressure or oom issues as my spark job is
>> heavy on shuffling and has a lot of window functions. Any help will be
>> super useful.
>>
>>
>>
>> Arun Ravi M V
>> B.Tech (Batch: 2010-2014)
>>
>> Computer Science and Engineering
>>
>> Govt. Model Engineering College
>> Cochin University Of Science And Technology
>> Kochi
>>
>

Reply via email to